"Message Broker" con RabbitMQ para microservicios

El “intermediario de mensajes” (message broker) representa un elemento fundamental en una arquitectura de microservicios, ya que permite la comunicación entre ellos como un mediador. Idealmente es capaz de sustituir todo contacto directo entre ellos ya que reduce el número de conexiones punto-a-punto. Esto, a su vez, simplifica el “análisis de impacto” (impact analysis) en toda la arquitectura.

En esté artículo trataremos, a través de un ejemplo, implementar un “intermediario de mensajes” utilizando como tecnología de gestión de colas a RabbitMQ.

El negocio

Para el ejemplo partimos de un negocio donde se gestionan productos y comentarios (los comentarios se le asignan a los productos con una evaluación), como arquitectura de microservicios tendríamos entonces dos de ellos, ms_product y ms_review. La problemática es que siempre queremos representar en productos (al menos), la cantidad de comentarios y el promedio de la valoración del producto (1 a 5). Es aquí donde juega un papel fundamentar nuestro “intermediario de mensajes“. Analicemos la siguiente gráfica:

Message Broker con RabbitMQ. Elaboración propia

La misma representa la forma en que se establece la comunicación entre los microservicios a través de una cola de mensajes de RabbitMQ, entre ellos nunca se comunican. Cuando se listan los productos en ms_product se selecciona uno y con ese identificador se procede a realizar el comentario con la evaluación del mismo en el ms_review, en ese instante se envía una mensaje a la cola con el identificador del producto y los datos actualizados de los comentarios numberReviews y reviewAverage (promedio de evaluación), al instante el ms_product con ese identificador actualiza los detalles del product.

Veamos las principales clases que hacen posible el “intermediario de mensajes

Paquete común “common”

Este paquete tiene la clase que manejará los datos del mensaje y que ambos microservicios utilizarán:

package cu.sacavix.springboot.rabbitmq.message.broker.product;
import java.io.Serializable;
public class ReviewProductMessage implements Serializable {
    private String id ;  // identificador del producto
    private float avg ;
    private int amount ;
    public ReviewProductMessage() {}
    public ReviewProductMessage(String id, float avg, int amount) {
        this.id = id;
        this.avg = avg;
        this.amount = amount;
    }
    public String getId() {
        return id;
    }
.
.
.
Microservicio Review

Este microservicio es el encargado de producir el mensaje ReviewProductMessage. En el recurso que brinda el endpoint para crear un review es donde se manda el mensaje:

Clase ReviewResource

package cu.sacavix.springboot.rabbitmq.message.broker.reviews.web;
import cu.sacavix.springboot.rabbitmq.message.broker.product.ReviewProductMessage;
import cu.sacavix.springboot.rabbitmq.message.broker.reviews.entity.Review;
import cu.sacavix.springboot.rabbitmq.message.broker.reviews.repository.ReviewRepository;
import cu.sacavix.springboot.rabbitmq.message.broker.reviews.service.ReviewProducerService;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import java.util.List;
@Controller
@RequestMapping("/api")
public class ReviewResource {
    @Autowired
    private ReviewRepository reviewRepository ;
    @Autowired
    private ReviewProducerService reviewProducerService ;
    @CrossOrigin(origins = "*")
    @PostMapping(value = "/review")
    @ApiOperation(value= "Create review", response = Review.class)
    public ResponseEntity<?> create(@RequestBody Review review) {
        if (review.getValue() >= 0 && review.getValue() <=5 ) {
            Review r = reviewRepository.save(review) ;
            createReviewProductMessage(r) ;
            return ResponseEntity.ok(r) ;
        } else {
            return ResponseEntity.badRequest().body("The evaluation should be an integer betwen 0 - 5");
        }
    }
    private void createReviewProductMessage(Review r) {
        List<Review> reviews = this.reviewRepository.findByResourceId(r.getResourceId());
        int total = 0;
        for (Review review : reviews) {
            total += review.getValue();
        }
        float average = total/reviews.size() ;
        average = ((int) (average * 10)) / 10.0f;
        reviewProducerService.message(new ReviewProductMessage(r.getResourceId(), average, reviews.size()));
    }
}

En la línea 35 se realiza la llamada de la funcionalidad createReviewProductMessage y en las línea 50 se llama el servicio encargado de enviar el mensaje.

Clase ReviewProducerService

package cu.sacavix.springboot.rabbitmq.message.broker.reviews.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import cu.sacavix.springboot.rabbitmq.message.broker.product.ReviewProductMessage;
import cu.sacavix.springboot.rabbitmq.message.broker.reviews.configuration.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ReviewProducerService {
    private final ObjectMapper mapper = new ObjectMapper() ;
    public static final Logger logger = LoggerFactory.getLogger(ReviewProducerService.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void message(ReviewProductMessage message) {
        try {
            logger.info("Enviando un nuevo review creado para: [{}] ", message.getId());
            rabbitTemplate.convertAndSend(Constants.REVIEW_CREATED_QUEUE, mapper.writeValueAsString(message));
        } catch (JsonProcessingException e) {
            logger.error(e.getMessage());
        }
    }
}

Estas líneas de código son suficientes para enviar el mensaje para la cola REVIEW_CREATED_QUEUE, convirtiendo el mensaje ReviewProductMessage al formato JSON, la configuración de la cola se inicializa en la clase Configuration, donde REVIEW_CREATED_QUEUE es una constante literal con el valor “created.review

package cu.sacavix.springboot.rabbitmq.message.broker.reviews.configuration;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class Configuration {
    @Bean
    public Queue queue1() {
        return new Queue(Constants.REVIEW_CREATED_QUEUE);
    }
}

Si iniciamos este servicio sin tener el cosumidor (ms_product) funcionando y creamos review, entonces los mensajes se encolarán hasta que sean consumidos, veamos esto en las siguientes tres imágenes:

Creación de review s través del Swagger en ms_review

Desde el swagger del ms_review creamos 3 review.

Registros en la consola de los tres mensajes enviados “Enviando un nuevo …”

Aquí podemos ver como se envían para la cola los tres mensajes con el identificador del producto “identificador xxx”, esto solo es ilustrativo, en estos momentos no existen productos con ese identificador, lo que provocará que una vez arrancado el consumidor, los descartará. Veamos ahora los mensajes en espera en la consola de administración de RabbitMQ:

Consola RabbitMQ

Vean que en la cola created.review existen 3 mensajes en estado “Ready“. Solo queda que el microservicio ms_product lo consuma.

Microservicio Product

En este microservicio, la clase más relevante relacionada con el intermediario de mensajes en ReviewProductConsumerService, veamos su código:

package cu.sacavix.springboot.rabbitmq.message.broker.product.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import cu.sacavix.springboot.rabbitmq.message.broker.product.ReviewProductMessage;
import cu.sacavix.springboot.rabbitmq.message.broker.product.configuration.Constants;
import cu.sacavix.springboot.rabbitmq.message.broker.product.entity.Product;
import cu.sacavix.springboot.rabbitmq.message.broker.product.repository.ProductRepository;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Optional;
@Component
public class ReviewProductConsumerService {
    private final ObjectMapper mapper = new ObjectMapper() ;
    @Autowired
    private ProductRepository productRepository ;
    /**
     * Receiving notifications to create the person
     */
    @RabbitListener(queues = Constants.REVIEW_CREATED_QUEUE)
    public void rabbitListener(String in) {
        try {
            ReviewProductMessage reviewProductMessage = mapper.readValue(in, ReviewProductMessage.class);
            Optional<Product> o = productRepository.findById(reviewProductMessage.getId()) ;
            if(o.isPresent()) {
                Product product = o.get() ;
                product.setNumberReviews(reviewProductMessage.getAmount());
                product.setReviewAverage(reviewProductMessage.getAvg());
                productRepository.saveAndFlush(product) ;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

@RabbitListener es la encargada de escuchar los nuevos mensajes de la cola REVIEW_CREATED_QUEUE y procesarlos en la función rabbitListener, donde obtenemos en JSON, lo convertimos a la clase ReviewProductMessage y actualizamos el producto con ese identificador.

Este es un ejemplo sencillo de como se pueden comunicar dos microservicios a través de un intermediario de mensajes, para este caso, los microservicios pueden combinar roles, o sea, ser consumidores y productores a la vez, un ejemplo muy válido, es que cuando se elimine un producto en el ms_product, se envíe un mensaje para la cola con ese identificador, para que el ms_review con el rol de consumidor, elimine todos los reviews asociados a este producto utilizando otra cola.

En este enlace les dejo el proyecto message-broker-with-rabbitmq para que lo revisen, sobre todo, las configuraciones para la comunicación con RabbitMQ, donde recuerden que deben haber creado un host virtual y credenciales con acceso a este host virtual en la consola de RabbitMQ o a través del cli de terminal rabbitmqctl.