Spring Kafka
Introduction
Apache Kafka is a distributed streaming platform that lets you publish, subscribe to, store, and process streams of records in real-time. It's designed for high throughput, fault tolerance, and horizontal scalability. Spring Kafka is a Spring project that provides integration with the Kafka messaging system, making it easier to work with Kafka in Spring applications.
In this tutorial, we'll explore how to use Spring Kafka to:
- Set up producers to send messages to Kafka topics
- Configure consumers to receive messages from Kafka topics
- Process messages in real-time
- Handle errors and implement retry mechanisms
By the end of this guide, you'll have a solid understanding of how to integrate Kafka into your Spring applications for building event-driven systems.
Prerequisites
Before we start, you should have:
- Basic knowledge of Spring Framework
- Java Development Kit (JDK) 8 or higher
- Maven or Gradle
- A local Kafka installation or Docker to run Kafka
Setting up Spring Kafka
Adding Dependencies
To use Spring Kafka in your project, you need to add the appropriate dependencies.
For Maven:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.9</version>
</dependency>
For Gradle:
implementation 'org.springframework.kafka:spring-kafka:3.0.9'
Basic Configuration
First, let's configure Spring Kafka in our application. Create a configuration class:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
// Producer configuration
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// Consumer configuration
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
You can also configure Spring Kafka using application properties:
# Producer properties
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# Consumer properties
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
Producing Messages
Now that we've configured Spring Kafka, let's create a producer to send messages to a Kafka topic.
Simple Message Producer
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("Message sent: " + message);
}
// Send message with key
public void sendMessage(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
System.out.println("Message sent with key " + key + ": " + message);
}
}
REST Controller to Send Messages
Let's create a simple REST controller to send messages:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
private final KafkaProducer kafkaProducer;
@Autowired
public KafkaController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@PostMapping("/publish")
public String sendMessage(@RequestParam("message") String message) {
kafkaProducer.sendMessage("my-topic", message);
return "Message sent: " + message;
}
}
Sending Object Messages
To send objects instead of simple strings, you need to use a JSON serializer. Update your producer configuration:
import org.springframework.kafka.support.serializer.JsonSerializer;
// other imports...
@Configuration
public class KafkaJsonConfig {
@Bean
public ProducerFactory<String, User> userProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, User> userKafkaTemplate() {
return new KafkaTemplate<>(userProducerFactory());
}
}
Now you can create a producer to send User objects:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class UserKafkaProducer {
private final KafkaTemplate<String, User> kafkaTemplate;
@Autowired
public UserKafkaProducer(KafkaTemplate<String, User> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendUser(String topic, User user) {
kafkaTemplate.send(topic, user);
System.out.println("User sent: " + user.getName());
}
}
Define a simple User class:
public class User {
private Long id;
private String name;
private String email;
// Constructors, getters, and setters
public User() {}
public User(Long id, String name, String email) {
this.id = id;
this.name = name;
this.email = email;
}
// Getters and setters
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
@Override
public String toString() {
return "User{id=" + id + ", name='" + name + "', email='" + email + "'}";
}
}
Consuming Messages
Now, let's create a consumer to receive messages from Kafka topics.
Simple Message Consumer
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
Consuming JSON Messages
To consume JSON objects, configure a JSON deserializer:
import org.springframework.kafka.support.serializer.JsonDeserializer;
// other imports...
@Configuration
public class KafkaJsonConsumerConfig {
@Bean
public ConsumerFactory<String, User> userConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(userConsumerFactory());
return factory;
}
}
Now create a consumer to receive User objects:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class UserKafkaConsumer {
@KafkaListener(
topics = "user-topic",
groupId = "user-group",
containerFactory = "userKafkaListenerContainerFactory"
)
public void listenUserMessages(User user) {
System.out.println("Received user: " + user);
// Process the user object
}
}
Advanced Kafka Features
Batch Processing
You can also process messages in batches:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class BatchKafkaConsumer {
@KafkaListener(
topics = "batch-topic",
groupId = "batch-group",
containerFactory = "batchKafkaListenerContainerFactory"
)
public void listenBatch(List<ConsumerRecord<String, String>> records) {
System.out.println("Batch received, size: " + records.size());
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received: " + record.value() + " from partition " + record.partition());
}
}
}
And the corresponding configuration:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> batchKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // Enable batch listening
return factory;
}
Error Handling
Error handling is crucial in message processing. Spring Kafka provides several ways to handle errors:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Configure error handler
factory.setErrorHandler((exception, data) -> {
System.err.println("Error in message consumption: " + exception.getMessage());
System.err.println("Data: " + data);
});
return factory;
}
Retry Mechanism
For transient errors, you can configure a retry mechanism:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> retryKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Configure retry
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(context -> {
ConsumerRecord<String, String> record = (ConsumerRecord<String, String>) context.getAttribute("record");
System.err.println("Recovery - Message failed after multiple retries: " + record.value());
return null;
});
return factory;
}
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000); // 1 second
retryTemplate.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
Real-world Example: Notification System
Let's create a practical example of a notification system using Spring Kafka. We'll build a service that:
- Receives notification requests
- Processes them via Kafka
- Sends notifications to different channels (email, SMS, etc.)
Domain Models
First, create a Notification
model:
public class Notification {
private String id;
private String recipient;
private String content;
private NotificationType type;
private boolean sent;
// Constructors, getters, and setters
public enum NotificationType {
EMAIL, SMS, PUSH
}
}
Notification Producer
@Service
public class NotificationProducer {
private final KafkaTemplate<String, Notification> kafkaTemplate;
@Autowired
public NotificationProducer(KafkaTemplate<String, Notification> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendNotification(Notification notification) {
// Use notification type as the key for partitioning
String key = notification.getType().toString();
ListenableFuture<SendResult<String, Notification>> future =
kafkaTemplate.send("notifications", key, notification);
future.addCallback(
result -> System.out.println("Notification sent: " + notification.getId()),
ex -> System.err.println("Failed to send notification: " + ex.getMessage())
);
}
}
Notification Consumer
@Service
public class NotificationConsumer {
private final EmailService emailService;
private final SMSService smsService;
private final PushNotificationService pushService;
@Autowired
public NotificationConsumer(
EmailService emailService,
SMSService smsService,
PushNotificationService pushService) {
this.emailService = emailService;
this.smsService = smsService;
this.pushService = pushService;
}
@KafkaListener(
topics = "notifications",
groupId = "notification-processor",
containerFactory = "notificationKafkaListenerContainerFactory"
)
public void processNotification(Notification notification) {
System.out.println("Processing notification: " + notification.getId());
try {
switch(notification.getType()) {
case EMAIL:
emailService.sendEmail(notification);
break;
case SMS:
smsService.sendSMS(notification);
break;
case PUSH:
pushService.sendPushNotification(notification);
break;
default:
System.err.println("Unknown notification type: " + notification.getType());
}
// Mark notification as sent
notification.setSent(true);
} catch (Exception e) {
System.err.println("Failed to process notification: " + e.getMessage());
// In a real application, you might want to handle retries or dead-letter topics
}
}
}
REST API to Send Notifications
@RestController
@RequestMapping("/notifications")
public class NotificationController {
private final NotificationProducer notificationProducer;
@Autowired
public NotificationController(NotificationProducer notificationProducer) {
this.notificationProducer = notificationProducer;
}
@PostMapping
public ResponseEntity<String> sendNotification(@RequestBody Notification notification) {
notification.setId(UUID.randomUUID().toString());
notification.setSent(false);
notificationProducer.sendNotification(notification);
return ResponseEntity.accepted()
.body("Notification queued with ID: " + notification.getId());
}
}
Summary
In this tutorial, we've covered the essentials of Spring Kafka:
- Configuration: Setting up Spring Kafka with producers and consumers
- Producing Messages: Sending string and JSON messages to Kafka topics
- Consuming Messages: Receiving messages using
@KafkaListener
annotation - Advanced Features: Batch processing, error handling, and retry mechanisms
- Real-world Example: Building a notification system using Spring Kafka
Spring Kafka simplifies working with Apache Kafka in your Spring applications, allowing you to build scalable, event-driven systems with minimal boilerplate code.
Additional Resources
To continue learning about Spring Kafka:
- Spring Kafka Official Documentation
- Apache Kafka Official Documentation
- Spring Kafka GitHub Repository
Exercises
- Create a simple chat application using Spring Kafka where users can send messages to different chat rooms.
- Implement a dead-letter topic system that captures and logs failed message processings.
- Create a metrics system that tracks the number of messages processed by your Kafka consumers.
- Implement a message filtering system that only processes messages matching certain criteria.
- Set up a multi-broker Kafka cluster and test your Spring Kafka application with it.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)