Comunicando microservicios mediante Apache Kafka (+Ejemplo)

Apache Kafka provee un sistema de mensajería mediante un bus de mensajes altamente escalable y resiliente. En entradas anteriores hablamos sobre como instalar Kafka & Zookeeper desde Docker y sobre Windows directamente.

En esta entrada expondremos como desarrollar una aplicación que consta de dos microservicios y entre ellos se comunican usando Kafka mediante el paso de eventos, un intento bastante simplista pero real de comunicación basada en eventos.

El problema a resolver será:

Implementar la sección de una plataforma basada en microservicios que gestiona clientes y bajo ciertas condiciones debe enviar notificaciones por email a los mismos.

Para esto vamos a implementar dos pequeños servicios “Customers” & “Notifications”.

Customers recibe por API REST una notificación de creación de un cliente y posteriormente publica un mensaje de “CUSTOMER CREATED” en el bus de mensajes, el consumidor en esta arquitectura simplificada será Notifications, que recibe el mensaje y lo procesa de manera totalmente asíncrona y desacoplada.

Como elementos comunes ambos microservicios, para el objetivo central que es la comunicación con Kafka usaran la misma dependencia:

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

Puedes ver este artículo más ampliamente explicado mediante este vídeo en nuestro canal de Youtube.

Servicio Customers: Creando un publicador Kafka

Importante: Antes de seguir, sino dispone de Kafka en ejecución lea los siguientes artículos:

Luego de crear un proyecto Spring Boot y agregada la dependencia de Spring Kafka, entonces el elemento fundamental para poder conectarnos a Kafka es crear el Bean de KafkaTemplate, esto pasa por definir nuestro ProducerFactory, que indicará que tipo de datos producirá nuestra aplicación y enviará hacia el bus de mensajes. Para ello creamos una clase de configuración similar a la que se muestra:

import com.sacavix.events.Event;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

	private final String bootstrapAddress = "localhost:9092";

    @Bean
    public ProducerFactory<String, Event<?>> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          StringSerializer.class);
        configProps.put(
        	      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        	      JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Event<?>> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Una vez que tengamos el template de kafka creado, solo necesitamos inyectarlo en las clases especificas donde lo vamos a usar y usar el método send, el siguiente código muestra como usarlo, y es un snippet de este proyecto completo que te compartimos en GitHub.

import com.sacavix.events.CustomerCreatedEvent;
import com.sacavix.events.Event;
import com.sacavix.events.EventType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import com.sacavix.entity.Customer;

import java.util.Date;
import java.util.UUID;

@Component
public class CustomerEventsService {

	@Autowired
	private KafkaTemplate<String, Event<?>> producer;

	@Value("${topic.customer.name:customers}")
	private String topicCustomer;

	public void publish(Customer customer) {

		CustomerCreatedEvent created = new CustomerCreatedEvent();
		created.setData(customer);
		created.setId(UUID.randomUUID().toString());
		created.setType(EventType.CREATED);
		created.setDate(new Date());

		this.producer.send(topicCustomer, created);
	}
}

Servicio Notifications: Creando un consumidor Kafka

Similar al productor, la creación de un consumidor depende de la creación de un Listener (o varios) que se capaz de suscribirse y escuchar (pullear) los eventos que llegan a uno o varios tópicos de kafka, los listener se implementan mediante la interfaz GenericMessageListenerContainer y de una implementación particular como es el caso de ConcurrentKafkaListenerContainerFactory. Se emplea un ConsumerFactory que define las propiedades para la conexión al bus de mensajes, la forma de deserializar la información entre muchas otras posibles configuraciones y parámetros.

Todo esto lo podemos agrupa en una clase de configuración como la siguiente:

import com.sacavix.events.Event;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

	private final String bootstrapAddress = "localhost:9092";

    @Bean
    public ConsumerFactory<String, Event<?>> consumerFactory() {
        Map<String, String> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        props.put(JsonSerializer.TYPE_MAPPINGS,"com.sacavix:com.sacavix.events.Event");

        final JsonDeserializer<Event<?>> jsonDeserializer = new JsonDeserializer<>();
        return new DefaultKafkaConsumerFactory(
                props,
                new StringDeserializer(),
                jsonDeserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Event<?>>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, Event<?>> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Importante resaltar el @EnableKafka, en el consumidor si es mandatorio agregarlo.

Una vez creado el listener, para que efectivamente se comience a leer de Kafka debemos usar la anotación @KafkaListener , parametrizarla y listo. El siguiente segmento de código forma parte del mismo ejemplo que tenemos en GitHub.

import com.sacavix.events.CustomerCreatedEvent;
import com.sacavix.events.Event;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class
CustomerEventsService {

	@KafkaListener(
			topics = "${topic.customer.name:customers}",
			containerFactory = "kafkaListenerContainerFactory",
	groupId = "grupo1")
	public void consumer(Event<?> event) {
		if (event.getClass().isAssignableFrom(CustomerCreatedEvent.class)) {
			CustomerCreatedEvent customerCreatedEvent = (CustomerCreatedEvent) event;
			log.info("Received Customer created event .... with Id={}, data={}",
					customerCreatedEvent.getId(),
					customerCreatedEvent.getData().toString());
		}

	}
}

Es muy importante darse cuenta que kafkaListenerContainerFactory coincide con el Bean que hemos creado en la clase configuración. El parámetro topic define el tópico desde el cual estamos escuchando mensajes y el grupo es el consumer group o grupo de consumidores.

La forma en que hemos mostrado el tópico es usando SpEL, te recomiendo este artículo donde explicamos todo: SpEL, lenguaje de expresiones de Spring Framework.

Espero te sea útil el artículo y que aprendas con el lo básico de Spring Boot + Kafka.