延遲和死信隊列的配置
- 延遲隊列有效期一分鐘,后進(jìn)入死信隊列,如果異常就進(jìn)入異常隊列
@Configuration
@Data
public class RabbitMQConfig {
/**
* 交換機(jī)
*/
private String orderEventExchange="order.event.exchange";
/**
* 延遲隊列, 不能被監(jiān)聽消費
*/
private String orderCloseDelayQueue="order.close.delay.queue";
/**
* 關(guān)單隊列, 延遲隊列的消息過期后轉(zhuǎn)發(fā)的隊列,被消費者監(jiān)聽
*/
private String orderCloseQueue="order.close.queue";
/**
* 進(jìn)入延遲隊列的路由key
*/
private String orderCloseDelayRoutingKey="order.close.delay.routing.key";
/**
* 進(jìn)入死信隊列的路由key,消息過期進(jìn)入死信隊列的key
*/
private String orderCloseRoutingKey="order.close.routing.key";
/**
* 過期時間 毫秒,臨時改為1分鐘定時關(guān)單
*/
private Integer ttl=1000*60;
/**
* 消息轉(zhuǎn)換器
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 創(chuàng)建交換機(jī) Topic類型,也可以用dirct路由
* 一般一個微服務(wù)一個交換機(jī)
* @return
*/
@Bean
public Exchange orderEventExchange(){
return new TopicExchange(orderEventExchange,true,false);
}
/**
* 延遲隊列
*/
@Bean
public Queue orderCloseDelayQueue(){
Map<String,Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange",orderEventExchange);
args.put("x-dead-letter-routing-key",orderCloseRoutingKey);
args.put("x-message-ttl",ttl);
return new Queue(orderCloseDelayQueue,true,false,false,args);
}
/**
* 死信隊列,普通隊列,用于被監(jiān)聽
*/
@Bean
public Queue orderCloseQueue(){
return new Queue(orderCloseQueue,true,false,false);
}
/**
* 第一個隊列,即延遲隊列的綁定關(guān)系建立
* @return
*/
@Bean
public Binding orderCloseDelayBinding(){
return new Binding(orderCloseDelayQueue,Binding.DestinationType.QUEUE,orderEventExchange,orderCloseDelayRoutingKey,null);
}
/**
* 死信隊列綁定關(guān)系建立
* @return
*/
@Bean
public Binding orderCloseBinding(){
return new Binding(orderCloseQueue,Binding.DestinationType.QUEUE,orderEventExchange,orderCloseRoutingKey,null);
}
}
異常隊列配置類
public class RabbitMQErrorConfig {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 異常交換機(jī)
*/
private String orderErrorExchange = "order.error.exchange";
/**
* 異常隊列
*/
private String orderErrorQueue = "order.error.queue";
/**
* 異常routing.key
*/
private String orderErrorRoutingKey = "order.error.routing.key";
/**
* 異常交換機(jī)
* @return
*/
@Bean
public TopicExchange errorTopicExchange(){
return new TopicExchange(orderErrorExchange,true,false);
}
/**
* 異常隊列
* @return
*/
@Bean
public Queue errorQueue(){
return new Queue(orderErrorQueue,true);
}
/**
* 隊列交換機(jī)進(jìn)行綁定
* @param errorQueue
* @return
*/
@Bean
public Binding BindingErrorQueueAndExchange(Queue errorQueue,TopicExchange errorTopicExchange){
return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(orderErrorRoutingKey);
}
/**
* 配置 RepublishMessageRecoverer
* 用途:消息重試一定次數(shù)后,用特定的routingKey轉(zhuǎn)發(fā)到指定的交換機(jī)中,方便后續(xù)排查和告警
*
* 頂層是 MessageRecoverer接口,多個實現(xiàn)類
*
* @return
*/
@Bean
public MessageRecoverer messageRecoverer(){
return new RepublishMessageRecoverer(rabbitTemplate,orderErrorExchange,orderErrorRoutingKey);
}
}
文章來源地址http://www.zghlxwxcb.cn/news/detail-623612.html
文章來源:http://www.zghlxwxcb.cn/news/detail-623612.html
到了這里,關(guān)于RabbitMQ延遲隊列,死信隊列配置的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!