在 RabbitMQ之生產(chǎn)者發(fā)布確認(rèn)原理章節(jié)已經(jīng)介紹了rabbitmq生產(chǎn)者是如何對(duì)消息進(jìn)行發(fā)布確認(rèn)保證消息不丟失的。本章節(jié)繼續(xù)看下springboot整合rabbitmq后是如何保證消息不丟失的。
1. 消息需要發(fā)布確認(rèn)的原因
消息正常是通過(guò)生產(chǎn)者生產(chǎn)消息傳遞到交換機(jī),然后經(jīng)過(guò)交換機(jī)路由到消息隊(duì)列中,最后消費(fèi)者消費(fèi),如下圖所示
上述為正常情況,但也有情況會(huì)導(dǎo)致消息丟失的情況:第一種情況,交換機(jī)重啟,當(dāng)消息經(jīng)過(guò)消費(fèi)者投遞后,恰巧交換機(jī)正在重啟,會(huì)導(dǎo)致生產(chǎn)者投遞消息失敗,從而導(dǎo)致消息丟失;第二種情況,交換機(jī)沒(méi)問(wèn)題,消息投遞到交換機(jī)后,交換機(jī)路由到消息隊(duì)列過(guò)程中出了問(wèn)題,比如routingKey錯(cuò)誤導(dǎo)致路由不到隊(duì)列中。
針對(duì)上述兩種導(dǎo)致消息丟失的情況,下面采用消息確認(rèn)發(fā)布機(jī)制,分別采取消息正確投遞到交換機(jī)后回調(diào)接口來(lái)確認(rèn)消息正確被投遞,消息經(jīng)交換機(jī)正確路由到隊(duì)列中回調(diào)接口來(lái)確認(rèn)消息正確被路由。
2. 消息發(fā)送交換機(jī)后回調(diào)接口ConfirmCallback ,保證消息在發(fā)送交換機(jī)處不丟失
當(dāng)消息經(jīng)生產(chǎn)者投遞到交換機(jī)后,為避免消息丟失,需要回調(diào)RabbitTemplate.ConfirmCallback
接口,回調(diào)接口后,尤其是要對(duì)投遞失敗的消息進(jìn)行處理或者記錄下來(lái)保證消息不丟失。該接口不管消息投遞到交換機(jī)成功或者失敗都會(huì)進(jìn)行回調(diào),未避免消息丟失,可以選擇在回調(diào)接口中只處理或者登記投遞失敗的消息,達(dá)到消息不丟失的目的。
下面通過(guò)案例演示生產(chǎn)者投遞消息到交換機(jī)后回調(diào)ConfirmCallback
接口情況。
`
2.1 在application.properties中添加spring.rabbitmq.publisher-confirm-type=correlated配置,開(kāi)啟回調(diào)機(jī)制
spring.rabbitmq.host=192.168.xx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated
2.2 聲明交換機(jī)和隊(duì)列并進(jìn)行綁定
聲明confirm_exchange交換機(jī),聲明confirm_queue隊(duì)列,并通過(guò)routingKey=confirm綁定交換機(jī)和隊(duì)列。
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
public static final String ROUTING_KEY = "confirm";
/*聲明交換機(jī)*/
@Bean
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
/*聲明隊(duì)列*/
@Bean
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
/*綁定交換機(jī)和隊(duì)列*/
@Bean
public Binding exchangeBindingQueue(@Qualifier("confirmExchange") DirectExchange confirmExchange,
@Qualifier("confirmQueue") Queue confirmQueue){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);
}
}
2.3 實(shí)現(xiàn)RabbitTemplate.ConfirmCallback
接口
當(dāng)消息由生產(chǎn)者發(fā)到交換機(jī)后會(huì)回調(diào)該接口中的confirm方法
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ExchangeCallback implements RabbitTemplate.ConfirmCallback {
/* correlationData 內(nèi)含消息內(nèi)容
* ack 交換機(jī)接受成功或者失敗。 true表示交換機(jī)接受消息成功, false表示交換機(jī)接受失敗
* cause 表示失敗原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("hello world");
String id = correlationData.getId();
String message = new String(correlationData.getReturnedMessage().getBody());
if (ack){
log.info("交換機(jī)收到消息id為{}, 消息內(nèi)容為{}", id, message);
}else {
log.info("交換機(jī)未收到消息id為{}, 消息內(nèi)容為{}, 原因?yàn)閧}", id, message, cause);
}
}
}
2.4 創(chuàng)建生產(chǎn)者
創(chuàng)建生產(chǎn)者生產(chǎn)消息,本案例中生產(chǎn)者發(fā)送了2個(gè)消息,分別為hello rabbitmq 1
和hello rabbitmq 2
。
在創(chuàng)建生產(chǎn)者時(shí)要設(shè)置一下需要回調(diào)的接口ExchangeCallback
,在設(shè)置回調(diào)接口時(shí)用了java的@PostConstruct
注解,該注解作用用來(lái)指定bean初始化的順序,Constructor(構(gòu)造方法) -> @Autowired(依賴(lài)注入) -> @PostConstruct(注釋的方法)
。也就是說(shuō)在初始化Producer3對(duì)象時(shí),先調(diào)了Producer3的默認(rèn)無(wú)參構(gòu)造函數(shù);然后執(zhí)行Autowired注解部分,從Spring IOC容器中尋找rabbitTemplate和exchangeCallback對(duì)象分別注入到Producer3的rabbitTemplate和exchangeCallback屬性中;最后執(zhí)行PostConstruct注解部分,把exchangeCallback設(shè)置到rabbitTemplate中。
在發(fā)送hello rabbitmq 2
消息時(shí)故意把routingKey寫(xiě)錯(cuò)導(dǎo)致hello rabbitmq 2
消息不能從交換機(jī)發(fā)送到隊(duì)列中,為下一節(jié)做鋪墊。
import com.lzj.config.ConfirmConfig;
import com.lzj.config.ExchangeCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class Producer3 {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ExchangeCallback exchangeCallback;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(exchangeCallback);
}
public void produceMessage(){
String message = "hello rabbitmq 1";
CorrelationData correlationData1 = new CorrelationData("1");
correlationData1.setReturnedMessage(new Message(message.getBytes()));
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, correlationData1);
message = "hello rabbitmq 2";
CorrelationData correlationData2 = new CorrelationData("2");
correlationData2.setReturnedMessage(new Message(message.getBytes()));
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "error_routingkey", message, correlationData2);
}
}
2.5 啟動(dòng)測(cè)試
@SpringBootApplication
@Slf4j
public class SpringbootDemo {
public static void main(String[] args) {
ConfigurableApplicationContext app = SpringApplication.run(SpringbootDemo.class, args);
Producer3 producer = app.getBean(Producer3.class);
producer.produceMessage();
}
}
啟動(dòng)上面測(cè)試案例,生產(chǎn)者發(fā)送了id為1的hello rabbitmq 1
消息和id為2的hello rabbitmq 2
,交換機(jī)收到消息也回調(diào)了ExchangeCallback
接口對(duì)2條消息都進(jìn)行了確認(rèn),尤其是對(duì)于失敗的消息要在此步保存失敗的消息,避免消息在交換機(jī)這一步丟失。
2022-08-04 00:43:28.817 INFO 13512 --- [ main] com.lzj.producer.Producer3 : 生產(chǎn)者發(fā)送id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-04 00:43:28.823 INFO 13512 --- [ main] com.lzj.producer.Producer3 : 生產(chǎn)者發(fā)送id為2, 消息內(nèi)容為hello rabbitmq 2
2022-08-04 00:43:28.827 INFO 13512 --- [nectionFactory1] com.lzj.config.ExchangeCallback : 交換機(jī)收到消息id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-04 00:43:28.828 INFO 13512 --- [nectionFactory2] com.lzj.config.ExchangeCallback : 交換機(jī)收到消息id為2, 消息內(nèi)容為hello rabbitmq 2
但上面還有一個(gè)問(wèn)題,雖然回調(diào)ExchangeCallback接口,可以保證消息到交換機(jī)一步不會(huì)丟失,但如果交換機(jī)到隊(duì)列的過(guò)程中出現(xiàn)了問(wèn)題,消息一樣會(huì)丟失。比如上面生產(chǎn)者把routingKey寫(xiě)錯(cuò)了,就會(huì)導(dǎo)致hello rabbitmq 2
消息從交換機(jī)路由不到隊(duì)列中。下面創(chuàng)建消費(fèi)者程序,看消費(fèi)者消費(fèi)confirm_queue隊(duì)列中
消息情況
import com.lzj.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class Consumer3 {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void consuemrMessage(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("消費(fèi)者消費(fèi)消息{}", msg);
}
}
然后重新啟動(dòng)測(cè)試代碼,輸出如下,可以看出消費(fèi)端只消費(fèi)了hello rabbitmq 1
,而hello rabbitmq 2
消息則丟失了。
2022-08-04 00:45:28.817 INFO 13512 --- [ main] com.lzj.producer.Producer3 : 生產(chǎn)者發(fā)送id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-04 00:45:28.823 INFO 13512 --- [ main] com.lzj.producer.Producer3 : 生產(chǎn)者發(fā)送id為2, 消息內(nèi)容為hello rabbitmq 2
2022-08-04 00:45:28.827 INFO 13512 --- [nectionFactory1] com.lzj.config.ExchangeCallback : 交換機(jī)收到消息id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-04 00:45:28.828 INFO 13512 --- [nectionFactory2] com.lzj.config.ExchangeCallback : 交換機(jī)收到消息id為2, 消息內(nèi)容為hello rabbitmq 2
2022-08-04 00:45:28.831 INFO 13512 --- [ntContainer#2-1] com.lzj.consumer.Consumer3 : 消費(fèi)者消費(fèi)消息hello rabbitmq 1
3. 消息經(jīng)交換機(jī)路由到隊(duì)列后回調(diào)接口ReturnCallback ,保證消息在發(fā)送隊(duì)列處不丟失
為解決上一節(jié)中,消息由交換機(jī)和消息隊(duì)列中異常,導(dǎo)致消息丟失問(wèn)題,解決辦法就是在添加消息從交換機(jī)路由到隊(duì)列中失敗后回調(diào)的接口,在回調(diào)接口中把失敗的消息保存下來(lái)就可以避免消息丟失了。
在回調(diào)接口之前還需為RabbitMQ設(shè)置Mandatory標(biāo)志,只有當(dāng)該標(biāo)志為true時(shí),消息由交換機(jī)到隊(duì)列失敗后才會(huì)回調(diào)接口;如果該標(biāo)志設(shè)置false時(shí),消息由交換機(jī)路由到隊(duì)列失敗后自動(dòng)丟棄消息,會(huì)導(dǎo)致消息丟失,這也是默認(rèn)設(shè)置,所以如需保證消息不丟失,要打開(kāi)Mandatory標(biāo)志。
下面繼續(xù)進(jìn)行上面案例,在上面案例的基礎(chǔ)上添加ReturnCallback接口實(shí)現(xiàn)
package com.lzj.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class QueueCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息 {} 經(jīng)交換機(jī) {} 通過(guò)routingKey={} 路由到隊(duì)列失敗,失敗code為:{}, 失敗原因?yàn)椋簕}",
new String(message.getBody()), exchange, routingKey, replyCode, replyText);
}
}
然后修改上面的生產(chǎn)者Producer3 ,只需修改2點(diǎn):第一點(diǎn),為RabbitTemplate 設(shè)置Mandatory標(biāo)志;第二點(diǎn),把ReturnCallback的實(shí)現(xiàn)加入監(jiān)聽(tīng)。
import com.lzj.config.ConfirmConfig;
import com.lzj.config.ExchangeCallback;
import com.lzj.config.QueueCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class Producer3 {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ExchangeCallback exchangeCallback;
@Autowired
private QueueCallback queueCallback;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(exchangeCallback);
/**
* true:交換機(jī)無(wú)法將消息進(jìn)行路由時(shí),會(huì)將該消息返回給生產(chǎn)者
* false:如果發(fā)現(xiàn)消息無(wú)法進(jìn)行路由,則直接丟棄
*/
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(queueCallback);
}
public void produceMessage(){
String message = "hello rabbitmq 1";
CorrelationData correlationData1 = new CorrelationData("1");
correlationData1.setReturnedMessage(new Message(message.getBytes()));
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, correlationData1);
log.info("生產(chǎn)者發(fā)送id為{}, 消息內(nèi)容為{}", correlationData1.getId(), message);
message = "hello rabbitmq 2";
CorrelationData correlationData2 = new CorrelationData("2");
correlationData2.setReturnedMessage(new Message(message.getBytes()));
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "error_routingkey", message, correlationData2);
log.info("生產(chǎn)者發(fā)送id為{}, 消息內(nèi)容為{}", correlationData2.getId(), message);
}
}
下面重新啟動(dòng)上面的測(cè)試案例,可以得出如下測(cè)試結(jié)果,生產(chǎn)者生產(chǎn)的hello rabbitmq 1
和hello rabbitmq 2
消息都已發(fā)到交換機(jī),也都被交換機(jī)確認(rèn)了,但hello rabbitmq 2
被交換機(jī)路由到隊(duì)列時(shí)由于routingKey錯(cuò)誤導(dǎo)致路由失敗,已在ReturnCallback接口回調(diào)中被記錄下來(lái),最終正確被路由到隊(duì)列中的消息只有hello rabbitmq 1
,從打印日志看也只有hello rabbitmq 1
被消費(fèi)了。
2022-08-05 00:39:29.670 INFO 6832 --- [ main] com.lzj.producer.Producer3 : 生產(chǎn)者發(fā)送id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-05 00:39:29.683 INFO 6832 --- [ main] com.lzj.producer.Producer3 : 生產(chǎn)者發(fā)送id為2, 消息內(nèi)容為hello rabbitmq 2
2022-08-05 00:39:29.702 INFO 6832 --- [nectionFactory3] com.lzj.config.ExchangeCallback : 交換機(jī)收到消息id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-05 00:39:29.704 INFO 6832 --- [nectionFactory1] com.lzj.config.QueueCallback : 消息 hello rabbitmq 2 經(jīng)交換機(jī) confirm_exchange 通過(guò)routingKey=error_routingkey 路由到隊(duì)列失敗,失敗code為:312, 失敗原因?yàn)椋篘O_ROUTE
2022-08-05 00:39:29.705 INFO 6832 --- [nectionFactory2] com.lzj.config.ExchangeCallback : 交換機(jī)收到消息id為2, 消息內(nèi)容為hello rabbitmq 2
2022-08-05 00:39:29.707 INFO 6832 --- [ntContainer#2-1] com.lzj.consumer.Consumer3 : 消費(fèi)者消費(fèi)消息hello rabbitmq 1
結(jié)論:從上面2個(gè)案例可以看出,通過(guò)ConfirmCallback和ReturnCallback接口的回調(diào),保證了消息在交換機(jī)和隊(duì)列處消息不丟失。
4. 備份交換機(jī)
備份交換機(jī)目的是解決消息交換機(jī)路由到隊(duì)列中失敗時(shí)保證消息不丟失的一種方法,與上面第3節(jié)中回調(diào)ReturnCallback達(dá)到的目的一致,但不管是回調(diào)ConfirmCallback還是回調(diào)ReturnCallback都增加了生產(chǎn)者的負(fù)擔(dān),當(dāng)消息異常時(shí)會(huì)增加生產(chǎn)者的處理時(shí)間,導(dǎo)致生產(chǎn)端生產(chǎn)消息的吞吐量下降。另外對(duì)異常的消息也不可以用死信隊(duì)列(死信隊(duì)列詳解參考此篇文章),因?yàn)橄⒃谕哆f到交換機(jī)和隊(duì)列過(guò)程中異常時(shí)無(wú)法被投遞到死信交換機(jī),因?yàn)椴粷M足死信隊(duì)列的3個(gè)條件(參見(jiàn)死信隊(duì)列篇章)。因此備份交換機(jī)就是一種處理交換機(jī)路由隊(duì)列過(guò)程中失敗時(shí)保證消息不丟失的不錯(cuò)選擇。
以下圖為例,消息正常是有Producer生產(chǎn)者到confirm_exchange交換機(jī),再由confirm_exchange交換機(jī)路由到confirm_queue隊(duì)列中,最后被confirm_consumer消費(fèi)掉。如果消息從Producer到confirm_exchange過(guò)程中出現(xiàn)了異常會(huì)回調(diào)ConfirmCallback接口保證消息不丟失。如果消息在confirm_exchange交換機(jī)到confirm_queue出現(xiàn)了異常,在前面案例中是回調(diào)ReturnCallback接口保證消息不丟失的,而此處是通過(guò)備用交換機(jī)實(shí)現(xiàn),異常消息會(huì)被投遞到back_exchange交換機(jī)中,該交換機(jī)可以是fanout類(lèi)型交換機(jī),可以把消息分別路由到backup_queue隊(duì)列和warn_queue隊(duì)列中,backup_queue隊(duì)列中消息由backup_consumer消費(fèi)者消費(fèi),warn_queue隊(duì)列中消息由warn_consumer消費(fèi),其中backup_consumer用于備份失敗的消息,比如失敗的消息存儲(chǔ)到數(shù)據(jù)庫(kù)中,warn_consumer用于告警運(yùn)維或開(kāi)發(fā)人員有失敗的消息。
1. 首先修改上述案例中ConfirmConfig類(lèi),用于聲明confirm_exchange、backup_exchange交換機(jī),聲明confirm_queue、backup_queue、warn_queue隊(duì)列,并對(duì)交換機(jī)和隊(duì)列進(jìn)行綁定。
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
public static final String BACKUP_QUEUE_NAME = "backup_queue";
public static final String WARN_QUEUE_NAME = "warn_queue";
/*聲明確認(rèn)交換機(jī)*/
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME) //設(shè)置備份交換機(jī)
.build();
}
/*聲明確認(rèn)隊(duì)列*/
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
/*綁定確認(rèn)交換機(jī)和確認(rèn)隊(duì)列*/
@Bean
public Binding confirmExchangeBindingQueue(@Qualifier("confirmExchange") DirectExchange confirmExchange,
@Qualifier("confirmQueue") Queue confirmQueue){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("confirm");
}
/*聲明備份交換機(jī)*/
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
/*聲明備份隊(duì)列*/
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
/*綁定備份交換機(jī)和備份隊(duì)列*/
@Bean
public Binding backupExchangeBindingBackupQueue(@Qualifier("backupExchange")FanoutExchange backupExchange,
@Qualifier("backupQueue")Queue backupQueue){
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
/*聲明警告隊(duì)列*/
@Bean("warnQueue")
public Queue warnQueue(){
return QueueBuilder.durable(WARN_QUEUE_NAME).build();
}
/*綁定備份交換機(jī)和警告隊(duì)列*/
@Bean
public Binding backupExchangeBindingWarnQueue(@Qualifier("backupExchange")FanoutExchange backupExchange,
@Qualifier("warnQueue")Queue warnQueue){
return BindingBuilder.bind(warnQueue).to(backupExchange);
}
}
2. 創(chuàng)建ConfirmConsumer消費(fèi)者用于消費(fèi)confirm_queue中的消息
import com.lzj.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConfirmConsumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void consuemrMessage(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("消費(fèi)者消費(fèi)消息{}", msg);
}
}
3. 創(chuàng)建BackupConsumer消費(fèi)者用于消費(fèi)backup_queue中的消息
import com.lzj.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class BackupConsumer {
@RabbitListener(queues = ConfirmConfig.BACKUP_QUEUE_NAME)
public void backupMessage(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("消息{} 備份到數(shù)據(jù)庫(kù)", msg);
}
}
- 創(chuàng)建WarnConsumer消費(fèi)者用于消費(fèi)warn_queue中消息
import com.lzj.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class WarnConsumer {
@RabbitListener(queues = ConfirmConfig.WARN_QUEUE_NAME)
public void warnMessage(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("消息{} 消費(fèi)失敗,請(qǐng)盡快處理", msg);
}
}
5. 修改上述案例中的生產(chǎn)者Producer3中發(fā)消息時(shí)的routingKey為confirm,修改后代碼如下
import com.lzj.config.ConfirmConfig;
import com.lzj.config.ExchangeCallback;
import com.lzj.config.QueueCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class Producer3 {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ExchangeCallback exchangeCallback;
@Autowired
private QueueCallback queueCallback;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(exchangeCallback);
/**
* true:
* 交換機(jī)無(wú)法將消息進(jìn)行路由時(shí),會(huì)將該消息返回給生產(chǎn)者
* false:
* 如果發(fā)現(xiàn)消息無(wú)法進(jìn)行路由,則直接丟棄
*/
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(queueCallback);
}
public void produceMessage(){
String message = "hello rabbitmq 1";
CorrelationData correlationData1 = new CorrelationData("1");
correlationData1.setReturnedMessage(new Message(message.getBytes()));
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "confirm", message, correlationData1);
log.info("生產(chǎn)者發(fā)送id為{}, 消息內(nèi)容為{}", correlationData1.getId(), message);
message = "hello rabbitmq 2";
CorrelationData correlationData2 = new CorrelationData("2");
correlationData2.setReturnedMessage(new Message(message.getBytes()));
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "error_routingkey", message, correlationData2);
log.info("生產(chǎn)者發(fā)送id為{}, 消息內(nèi)容為{}", correlationData2.getId(), message);
}
}
然后重新啟動(dòng)測(cè)試程序進(jìn)行測(cè)試,輸出代碼如下所示,生產(chǎn)者生產(chǎn)了hello rabbitmq 1
和hello rabbitmq 2
消息,并且2條消息偶成功發(fā)到了confirm_exchange交換機(jī)中,由于hello rabbitmq 2
的routingKey寫(xiě)的有問(wèn)題,導(dǎo)致hello rabbit 1
消息被confirm_consumer正常消費(fèi),而hello rabbit 2
則被投遞到了backup_exchange備份交換機(jī)中,然后改消息被backup_consumer消費(fèi)者保存,并同時(shí)被warn_consumer發(fā)出警告。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-424314.html
2022-08-18 00:56:29.549 INFO 7088 --- [ main] com.lzj.producer.Producer3 : 生產(chǎn)者發(fā)送id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-18 00:56:29.559 INFO 7088 --- [nectionFactory1] com.lzj.config.ExchangeCallback : 交換機(jī)收到消息id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-18 00:56:29.563 INFO 7088 --- [ main] com.lzj.producer.Producer3 : 生產(chǎn)者發(fā)送id為2, 消息內(nèi)容為hello rabbitmq 2
2022-08-18 00:56:29.578 INFO 7088 --- [ntContainer#0-1] com.lzj.consumer.BackupConsumer : 消息hello rabbitmq 2 備份到數(shù)據(jù)庫(kù)
2022-08-18 00:56:29.578 INFO 7088 --- [ntContainer#1-1] com.lzj.consumer.ConfirmConsumer : 消費(fèi)者消費(fèi)消息hello rabbitmq 1
2022-08-18 00:56:29.580 INFO 7088 --- [ntContainer#4-1] com.lzj.consumer.WarnConsumer : 消息hello rabbitmq 2 消費(fèi)失敗,請(qǐng)盡快處理
2022-08-18 00:56:29.584 INFO 7088 --- [nectionFactory1] com.lzj.config.ExchangeCallback : 交換機(jī)收到消息id為2, 消息內(nèi)容為hello rabbitmq 2
可見(jiàn),備份隊(duì)列f方式保證了消息不丟失。如果同時(shí)設(shè)置了備份隊(duì)列以及第二個(gè)案例中的ReturnCallback回調(diào)接口,失敗消息會(huì)先進(jìn)備份隊(duì)列,因?yàn)閭浞蓐?duì)列優(yōu)先級(jí)高于回調(diào)RetrunCallback接口。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-424314.html
到了這里,關(guān)于8. springboot + rabbitmq 消息發(fā)布確認(rèn)機(jī)制的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!