Spring Boot + RabbitMQ + Error Handling Example
In this article we will implement a RabbitMQ Error Handling. Whenever any data in the message is transmitted that the receiver does not accept, or when a message is sent to a queue that does not exist. The message is retried and sent up to a set number of times. Even if the communication is not received by the recipient but is sent from the sender's end. Now In such instances, the message queue is marked as undeliverable or deadLetter queue.
RabbitMQ provides a method for handling message failures in a really efficient way known as the Retry and Error Handling feature.
What is Dead Message in RabbitMQ.
If certain messages become undeliverable or unhandled even though when message received by the broker. This can occur whenever the amount of time that the message spent in a queue is longer than the time it takes to live TTL, when the queue hits its capacity or the consumer admits a message negatively. Such a message is known as dead message.
However there is a better way to handle this situation. Setting up a RabbitMQ dead letter exchange and a dead letter queue enables for the storage and processing of orphaned messages. Now there is no need to lose messages completely.
RabbitMQ Tutorial :
In this case, if the employee experience provided in the RequestParam does not meet the criteria, then retry and error handling feature is invoked, and the message queue is declared as deadLetter queue.
Let's start developing Spring Boot for Producer and Consumer applications.
Producer
Project Structure
Maven Dependencies
We will use the Spring AMQP dependency to develop AMQP-based messaging solutions.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.1</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.techgeeknext</groupId>
<artifactId>SpringBootRabbitMQProducer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>SpringBootRabbitMQProducer</name>
<description>Spring Boot RabbitMQ + Error Handling</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Take a look at our suggested posts:
Employee Model Class
Using lombok, which will generate getters and setters automatically.package com.techgeeknext.model;
import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Setter
@ToString
@Getter
@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id"
, scope = Employee.class)
public class Employee {
private String name;
private String domain;
private int experience;
}
RabbitMQ Configuration Class
We require x-dead-letter-exchange
and x-dead-letter-routing-key
in order
to configure a dead letter.
It instructs the broker to use the standard exchange.
The deadLetterExchange
is used to bind dlq
, and the techgeeknextExchange
is used to bind
techgeeknext
.
The Jackson2JsonMessageConverter translates an object to JSON and then from JSON to a Java object.
package com.techgeeknext.config;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQProducerConfig {
@Bean
DirectExchange deadLetterExchange() {
return new DirectExchange("deadLetterExchange");
}
@Bean
DirectExchange exchange() {
return new DirectExchange("techgeeknextExchange");
}
@Bean
Queue dlq() {
return QueueBuilder.durable("deadLetter.queue").build();
}
@Bean
Queue queue() {
return QueueBuilder.durable("techgeeknext.queue")
.withArgument("x-dead-letter-exchange", "deadLetterExchange")
.withArgument("x-dead-letter-routing-key", "deadLetter").build();
}
@Bean
Binding DLQbinding() {
return BindingBuilder.bind(dlq())
.to(deadLetterExchange()).with("deadLetter");
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue())
.to(exchange()).with("techgeeknext");
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
}
Controller
Produceer method is used to send the message to the consumer through AMQP message.
package com.techgeeknext.controller;
import com.techgeeknext.model.Employee;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RabbitMQProducerController {
@Autowired
private AmqpTemplate amqpTemplate;
@GetMapping(value = "/sendMessage")
public String producer(@RequestParam("name") String name, @RequestParam("domain") String domain,
@RequestParam("exp") int exp) {
Employee emp = new Employee();
emp.setName(name);
emp.setDomain(domain);
emp.setExperience(exp);
//The convertAndSend method converts the java object to an amqp message,
// and then sends this message via the routing key to the exchange.
amqpTemplate.convertAndSend("techgeeknextExchange", "techgeeknext", emp);
return "Message sent to RabbitMQ server successfully!!";
}
}
Consumer
Project Structure
Maven Dependencies
We will use the Spring AMQP dependency to develop AMQP-based messaging solutions.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.techgeeknext</groupId>
<artifactId>SpringBootRabbitMQConsumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>SpringBootRabbitMQConsumer</name>
<description>Spring Boot RabbitMQ + Error Handling Example</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Properties
Here in application.yml file, we have activated the RabbitMQ retry mechanism for Spring Boot.spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
##After an interval of 3s the message should be retried.
initial-interval: 3s
##Maximum 6 times the message should be retried.
##It is then sent to the dead letter Queue.
max-attempts: 6
##The maximum interval should not be more than 10s between two retries.
max-interval: 10s
##The gap is multiplied by 2 between second repetition.
## But the maximum interval can never be exceeded
multiplier: 2
##So the retry interval values will be 3s, 6s, 10s, 10s, 10s. As 10 sec is the max interval specified.
server:
port: 8081
Custom Checked Exception
Create custom checked exception called CustomInvalidException.
package com.techgeeknext.exception;
public class CustomInvalidException extends Exception {
private static final long serialVersionUID = -3154618962130084535L;
}
RabbitMQ Consumer
Create the class EmployeeConsumerService that uses RabbitListener to consume message from RabbitMQ. For incoming messages the RabbitMQ Listener listens for RabbitMQ Queueue. The Queue/Topic Name (name of the queue/topic where the message should be consumed) is used for the basic configuration.
We'll also check the employee experience field here and throw CustomInvalidException in the case of a negative or invalid experience.
package com.techgeeknext.service;
import com.techgeeknext.exception.CustomInvalidException;
import com.techgeeknext.model.Employee;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class EmployeeConsumerService {
private static final Logger logger = LoggerFactory.getLogger(EmployeeConsumerService.class);
@RabbitListener(queues = "techgeeknext.queue")
public void consumeMessage(Employee employee) throws CustomInvalidException {
logger.info("Recieved Message From RabbitMQ techgeeknextExchange: " + employee);
if (employee.getExperience() < 0 || employee.getExperience() > 30 ) {
throw new CustomInvalidException();
}
}
}
Test
- Now, test this application at local. Start local RabbitMQ.
- Start the Producer Spring Boot Application.
- Produce/send the message with invalid Employee experience as 35 to consumer by using endpoint as http://localhost:8080/sendMessage?name=techgeeknext&domain=XYZ&exp=35
- Now start the Consumer Spring Boot Application.
- Consumer : The message is sent by rabbitMQ queue to the consumer's application, which is designated
techgeeknext.queue
. Since the employee experience is invalid, it'll throw CustomInvalidException. This message is tried again 6 times and then placed in the queue of the dead letter.. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.5.1) 16:00:31.805 INFO 8532 --- [main] c.t.SpringBootRabbitMQConsumer: Starting SpringBootRabbitMQConsumer using Java 12.0.2 on LAPTOP-S6R44CQL with PID 8532 (D:\SpringBootRabbitMQConsumer\target\classes started in D:\SpringBootRabbitMQConsumer) 16:00:31.808 INFO 8532 --- [main] c.t.SpringBootRabbitMQConsumer: No active profile set, falling back to default profiles: default 16:00:32.848 INFO 8532 --- [main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8081 (http) 16:00:32.856 INFO 8532 --- [main] o.apache.catalina.core.StandardService : Starting service [Tomcat] 16:00:32.856 INFO 8532 --- [main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.46] 16:00:32.939 INFO 8532 --- [main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext 16:00:32.939 INFO 8532 --- [main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1088 ms 16:00:33.417 INFO 8532 --- [main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8081 (http) with context path '' 16:00:33.418 INFO 8532 --- [main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672] 16:00:33.458 INFO 8532 --- [main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#37b56ac7:0/SimpleConnection@6ebc9573 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 64346] 16:00:33.510 INFO 8532 --- [main] c.t.SpringBootRabbitMQConsumer: Started SpringBootRabbitMQConsumer in 2.069 seconds (JVM running for 2.353) 16:03:10.713 INFO 8532 --- [ntContainer#0-1] c.t.service.EmployeeConsumerService: Recieved Message From RabbitMQ techgeeknextExchange: Employee(name=techgeeknext, domain=XYZ, experience=35) 16:03:13.719 INFO 8532 --- [ntContainer#0-1] c.t.service.EmployeeConsumerService: Recieved Message From RabbitMQ techgeeknextExchange: Employee(name=techgeeknext, domain=XYZ, experience=35) 16:03:19.735 INFO 8532 --- [ntContainer#0-1] c.t.service.EmployeeConsumerService: Recieved Message From RabbitMQ techgeeknextExchange: Employee(name=techgeeknext, domain=XYZ, experience=35) 16:03:29.746 INFO 8532 --- [ntContainer#0-1] c.t.service.EmployeeConsumerService: Recieved Message From RabbitMQ techgeeknextExchange: Employee(name=techgeeknext, domain=XYZ, experience=35) 16:03:39.758 INFO 8532 --- [ntContainer#0-1] c.t.service.EmployeeConsumerService: Recieved Message From RabbitMQ techgeeknextExchange: Employee(name=techgeeknext, domain=XYZ, experience=35) 16:03:49.760 INFO 8532 --- [ntContainer#0-1] c.t.service.EmployeeConsumerService: Recieved Message From RabbitMQ techgeeknextExchange: Employee(name=techgeeknext, domain=XYZ, experience=35) 16:03:49.784 WARN 8532 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'{"@id":1,"name":"techgeeknext","domain":"XYZ","experience":35}' MessageProperties [headers={__TypeId__=com.techgeeknext.model.Employee}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=techgeeknextExchange, receivedRoutingKey=techgeeknext, deliveryTag=2, consumerTag=amq.ctag-u4Xd3zuiVqOKGf8FzwA7qg, consumerQueue=techgeeknext.queue]) org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.techgeeknext.service.EmployeeConsumerService.recievedMessage(com.techgeeknext.model.Employee) throws com.techgeeknext.exception.CustomInvalidException' threw exception at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:247) ~[spring-rabbit-2.3.8.jar:2.3.8] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:191) ~[spring-rabbit-2.3.8.jar:2.3.8] at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:137) ~[spring-rabbit-2.3.8.jar:2.3.8] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1654) ~[spring-rabbit-2.3.8.jar:2.3.8] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1573) ~[spring-rabbit-2.3.8.jar:2.3.8] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na] at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.8.jar:5.3.8] at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.8.jar:5.3.8] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.8.jar:5.3.8] at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:93) ~[spring-retry-1.3.1.jar:na] at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.1.jar:na] at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.1.jar:na] at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:116) ~[spring-retry-1.3.1.jar:na] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.8.jar:5.3.8] at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.8.jar:5.3.8] at org.springframework.amqp.rabbit.listener.$Proxy65.invokeListener(Unknown Source) ~[na:2.3.8] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1561) ~[spring-rabbit-2.3.8.jar:2.3.8] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1552)
- Rabbitmq Exchange
- Rabbitmq Queues
. _____ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot ::(v2.5.1)
16:00:18.798 INFO 17000 --- [ main] c.t.SpringBootRabbitMQProducer : Starting SpringBootRabbitMQProducer using Java 12.0.2 on LAPTOP-S6R44CQL with PID 17000 (D:\SpringBootRabbitMQProducer\target\classes started in D:\SpringBootRabbitMQProducer)
16:00:18.801 INFO 17000 --- [ main] c.t.SpringBootRabbitMQProducer : No active profile set, falling back to default profiles: default
16:00:19.922 INFO 17000 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
16:00:19.932 INFO 17000 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
16:00:19.933 INFO 17000 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.46]
16:00:20.008 INFO 17000 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
16:00:20.008 INFO 17000 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1174 ms
16:00:20.525 INFO 17000 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer :
Tomcat started on port(s): 8080 (http) with context path ''
16:00:20.534 INFO 17000 --- [ main] c.t.SpringBootRabbitMQProducer :
Started SpringBootRabbitMQProducer in 2.073 seconds (JVM running for 2.362)
16:00:44.371 INFO 17000 --- [nio-8080-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/] :
Initializing Spring DispatcherServlet 'dispatcherServlet'
16:00:44.372 INFO 17000 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet :
Initializing Servlet 'dispatcherServlet'
16:00:44.374 INFO 17000 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet :
Completed initialization in 2 ms
16:01:04.917 INFO 17000 --- [nio-8080-exec-3] o.s.a.r.c.CachingConnectionFactory :
Attempting to connect to: [localhost:5672]
16:01:05.029 INFO 17000 --- [nio-8080-exec-3] o.s.a.r.c.CachingConnectionFactory :
Created new connection: rabbitConnectionFactory#14229fa7:0/SimpleConnection@58b43bdf
[delegate=amqp://guest@127.0.0.1:5672/, localPort= 61153]
Download Source Code
The full source code for this article can be found on below.Download it here -
- Producer Spring Boot + RabbitMQ + Error Handling Example
- Consumer Spring Boot + RabbitMQ + Error Handling Example