最近遇到了關于 RabbitMQ 的問題,打比方說:某個微服務模塊中,RabbitMQ 的大部分消費者需要重試兩次,而小部分消費者由于特殊原因并不需要進行重試。這就涉及到自定義重試次數(shù)的話題了,但在網(wǎng)上找了一圈沒發(fā)現(xiàn)相關的,但是功夫不負有心人,最后還是解決了這個問題,接下來給大家分享一下~
目錄
1 默認配置重試次數(shù)
2 自定義重試次數(shù)
2.1 消費者
①?配置文件
② 配置隊列,綁定交換機
③ 消費者文件
2.2 生產者
①?配置文件
② 生產者文件
③?測試文件
2.3 啟動測試文件
1 默認配置重試次數(shù)
一般來說,關于 RabbitMQ 的重試次數(shù)是直接在配置文件中進行定義(比如 application.yml),那么所有的消費者都將遵循這個配置條件,比如 ??
spring.application.name=spirng-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.simple.retry.enabled=true # 開啟消費者重試機制
spring.rabbitmq.listener.simple.retry.max-attempts=3 # 最大重試次數(shù)
spring.rabbitmq.listener.simple.retry.initial-interval=3000 # 重試時間間隔
該配置中的 max-attempts 決定了消費者的重試次數(shù),不過有一點需求注意:max-attempts 指的是嘗試次數(shù),就是說最開始消費的那一次也是計算在內的,那么 max-attempts: 3 便是重試兩次,另外一次是正常消費~
同時消費者遵循的是本模塊的 RabbitMQ 配置,并不會讀取生產者的配置。打比方說,生產者模塊配置重試 3?次,而消費者模塊配置重試 1 次,那么生產者給消費者發(fā)送消息,消費者進行消費,如果觸發(fā)了重試,消費者也只會重試一次,它只遵循消費者模塊的配置??!
如上,默認配置重試次數(shù)就算完成了,但是并沒有實現(xiàn)針對不同消費者的自定義重試功能,請繼續(xù)看第二章內容。
2 自定義重試次數(shù)
以應用廣泛的訂閱模式為例,由于消費者和生產者配置不一,注意消費者和生產者不在同一模塊!因此分開闡述:
2.1 消費者
主要配置是在消費者這!!
①?配置文件
對于消費者來說,該配置不僅起到了連接作用,同時也啟動了重試機制,默認重試 2 次。
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.simple.retry.enabled=true # 開啟消費者重試機制
spring.rabbitmq.listener.simple.retry.max-attempts=3 # 最大重試次數(shù)
spring.rabbitmq.listener.simple.retry.initial-interval=3000 # 重試時間間隔
② 配置隊列,綁定交換機
package com.yinyu.consumer.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
@Configuration
public class FanoutRabbitConfig {
@Autowired
private ConnectionFactory connectionFactory;
//自定義工廠
@Bean
public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setAdviceChain(retries());
return factory;
}
@Bean
public RetryOperationsInterceptor retries() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(1) //設置最大嘗試次數(shù)為1(不重試)
.backOffOptions(1000, 3.0, 10000)
.recoverer(new RejectAndDontRequeueRecoverer()).build();
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Queue retryTest1() {
return new Queue("yinyu.retryTest1");
}
@Bean
public Queue retryTest2() {
return new Queue("yinyu.retryTest2");
}
@Bean
public Exchange topicExchange() {
return new TopicExchange("yinyu");//交換機命名
}
//隊列綁定交換機
@Bean
public List<Binding> allActivateBinding() {
return Arrays.asList(BindingBuilder.bind(
BindingBuilder.bind(retryTest1()).to(topicExchange()).with("yinyu.retryTest1").noargs(),
BindingBuilder.bind(retryTest2()).to(topicExchange()).with("yinyu.retryTest2").noargs());
}
}
③ 消費者文件
用于接收消息,設置了一個對照組,一個自定義配置,一個默認配置
package com.yinyu.consumer.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ReceiverA {
@RabbitListener(queues = "yinyu.retryTest1", containerFactory = "listenerContainerFactory")
public void retryReceiver1(Map<String,String> map) {
log.info("retryTest 自定義配置開始, key: {}", map.get("key"));
if (!Objects.equals(map.get("key"), "yinyu")){
throw new RuntimeException("value 值匹配不準確,請重新進行請求??!");
}
log.info("retryTest 結束");
}
@RabbitListener(queues = "yinyu.retryTest2")
public void retryReceiver2(Map<String,String> map) {
log.info("retryTest 默認配置開始, key: {}", map.get("key"));
if (!Objects.equals(map.get("key"), "yinyu")){
throw new RuntimeException("value 值匹配不準確,請重新進行請求??!");
}
log.info("retryTest 結束");
}
}
2.2 生產者
生產者不需要過多的配置,它的作用是發(fā)送消息
①?配置文件
寫在 application.properties 中,對于生產者來說奇起的是連接 rabbitmq 作用,如果它是調用其他模塊的消費者,那么這個重試配置是不起作用的。
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.simple.retry.enabled=true # 開啟消費者重試機制
spring.rabbitmq.listener.simple.retry.max-attempts=5 # 最大重試次數(shù)
spring.rabbitmq.listener.simple.retry.initial-interval=3000 # 重試時間間隔
② 生產者文件
用于發(fā)送消息,
package com.yinyu.producer.rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void retryProducer1(){
Map<String,String> map = new HashMap<>();
map.put("key","yinyu自定義");
rabbitTemplate.convertAndSend("yinyu", "yinyu.retryTest1", map);
}
public void retryProducer2(){
Map<String,String> map = new HashMap<>();
map.put("key","yinyu默認");
rabbitTemplate.convertAndSend("yinyu", "yinyu.retryTest2", map);
}
}
③?測試文件
package com.yinyu.producer.rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SenderTest {
@Autowired
private Sender sender;
//測試自定義配置
@Test
public void testCustomConfig() {
sender.retryProducer1();
}
//測試默認配置
@Test
public void testDefaultConfig() {
sender.retryProducer2();
}
}
2.3 啟動測試文件
有條件的各位可以啟動一下生產者的測試文件中這兩個方法,最終結果:文章來源:http://www.zghlxwxcb.cn/news/detail-631551.html
- retryProducer1 發(fā)送消息后,retryReceiver1 消費消息,雖然報錯,但沒有重試(遵循自定義配置)
- retryProducer2 發(fā)送消息后,retryReceiver2 消費消息,報錯且重試 4 次(遵循默認配置)
完美實現(xiàn)自定義重試次數(shù)的需求??!文章來源地址http://www.zghlxwxcb.cn/news/detail-631551.html
到了這里,關于【中間件】RabbitMQ 自定義重試次數(shù)(針對同一模塊不同消費者)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!