目錄
1.生產(chǎn)者發(fā)消息到交換機(jī)時候的消息確認(rèn)
2.交換機(jī)給隊列發(fā)消息時候的消息確認(rèn)
3.備用隊列
3.消費(fèi)者手動ack
?文章來源地址http://www.zghlxwxcb.cn/news/detail-484008.html
rabbitmq的發(fā)布確認(rèn)方式,可以有效的保證我們的數(shù)據(jù)不丟失。
?消息正常發(fā)送的流程是:生產(chǎn)者發(fā)送消息到交換機(jī),然后交換機(jī)通過路由鍵把消息發(fā)送給對應(yīng)的隊列,然后消費(fèi)者監(jiān)聽隊列消費(fèi)消息
但是如果生產(chǎn)者發(fā)送的消息,交換機(jī)收不到呢,又或者交換機(jī)通過路由鍵給對應(yīng)的隊列發(fā)消息時,路由鍵不存在呢,這些就是消息發(fā)布確認(rèn)所要解決的問題?
消息的發(fā)布確認(rèn)分別有:
- 生產(chǎn)者發(fā)消息到交換機(jī)時候的消息確認(rèn)
- 以及交換機(jī)發(fā)消息給隊列的消息確認(rèn)
先在application.properties配置文件中加上以下代碼:
# 確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)
spring.rabbitmq.publisher-confirm-type= correlated
# 確認(rèn)消息已發(fā)送到隊列
spring.rabbitmq.publisher-returns= true
# 確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)
spring.rabbitmq.publisher-confirm-type= correlated
這個意思是開啟confirm模式,這樣的話,當(dāng)生產(chǎn)者發(fā)送消息的時候,無論交換機(jī)是否收到,都會觸發(fā)回調(diào)方法
1.生產(chǎn)者發(fā)消息到交換機(jī)時候的消息確認(rèn)
?寫一個容器:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
// ConfirmCallback:消息只要發(fā)出,無論交換機(jī)有沒有接到消息,都會觸發(fā)ConfirmCallback類的confirm方法
// ConfirmCallback是有個內(nèi)部類
@Component
public class messageConfirm implements RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init()
{
rabbitTemplate.setConfirmCallback(this);
}
/**
*
* @param correlationData correlationData是發(fā)送消息時候攜帶的消息
* @param ack 如果為true,表示交換機(jī)接收到消息了
* @param message 異常消息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String message) {
if (ack)
{
System.out.println("交換機(jī)收到消息成功:" + correlationData.getId());
}else {
System.out.println("交換機(jī)收到消息失?。? + correlationData.getId() + "原因:" + message);
}
}
}
RabbitTemplate.ConfirmCallback是一個內(nèi)部接口類,只要生產(chǎn)者往交換機(jī)發(fā)送消息,都會該觸發(fā)ConfirmCallback類的confirm方法
注意:
? ? ? ? 因為RabbitTemplate.ConfirmCallback是一個內(nèi)部類,所以我們要通過? ? @PostConstruct注解,把當(dāng)前類賦值給ConfirmCallback
配置類:
package com.example.rabbitmq.發(fā)布確認(rèn);
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class messageConfrimConfig {
@Bean
public DirectExchange getConfrimTopic()
{
// 創(chuàng)建一個直接交換機(jī)
return ExchangeBuilder.directExchange("ljl-ConfrimTopic").build();
}
@Bean
public Queue getConfrimQueue()
{
return new Queue("ljl-ConfrimQueue");
}
@Bean
public Binding TopicConfrimBinding()
{
return BindingBuilder.bind(getConfrimQueue()).to(getConfrimTopic()).with("messageConfirm");
}
}
消費(fèi)者:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
public class clientConfirm {
@RabbitListener(queues = "ljl-ConfrimQueue")
@RabbitHandler
public void ConfrimQueue(Message message) {
System.out.println("正常隊列正常接收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
}
}
生產(chǎn)者:
@RestController
public class testConfirmController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value = "/sendMessageConfirm")
public String sendMessageConfirm()
{
HashMap<String, Object> mapExchange = new HashMap<>();
mapExchange.put("message","測試交換機(jī)的發(fā)布確認(rèn)消息");
// 關(guān)聯(lián)數(shù)據(jù)的一個類,交換機(jī)無論有沒有收到生產(chǎn)者發(fā)送的消息,都會返回這個對象
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());
// 這個是正常發(fā)送的,交換機(jī)的名稱,跟路由鍵的名稱都是存在的
rabbitTemplate.convertAndSend("ljl-ConfrimTopic","messageConfirm",JSONObject.toJSONString(mapExchange),correlationData);
return "成功";
}
}
直接運(yùn)行項目代碼:http://localhost:8080/sendMessageConfirm
?可以看到消息正常發(fā)送,正常消費(fèi),然后交換機(jī)回調(diào)方法
?
當(dāng)交換機(jī)不存在的時候:
一樣會觸發(fā)回調(diào)方法,然后打印錯誤消息?
?
2.交換機(jī)給隊列發(fā)消息時候的消息確認(rèn)
????????寫一個容器,實現(xiàn) RabbitTemplate.ReturnCallback 接口,重寫 returnedMessage 方法,這個方法是當(dāng)交換機(jī)推送消息給隊列的時候,路由鍵不存在就觸發(fā)的方法
????????注意:
? ? ? ????????? 因為RabbitTemplate.ReturnCallback是一個內(nèi)部類,所以我們要通過? ? @PostConstruct注解,把當(dāng)前類賦值給ReturnCallback
寫一個容器類:
package com.example.rabbitmq.發(fā)布確認(rèn);
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
// ConfirmCallback:消息只要發(fā)出,無論交換機(jī)有沒有接到消息,都會觸發(fā)ConfirmCallback類的confirm方法
// ConfirmCallback是個內(nèi)部類
// ReturnCallback是個內(nèi)部類
// ReturnCallback:但不可路由的時候,觸發(fā)回調(diào)方法
@Component
public class messageConfirm implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init()
{
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
*
* @param correlationData correlationData是發(fā)送消息時候攜帶的消息
* @param ack 如果為true,表示交換機(jī)接收到消息了
* @param message 異常消息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String message) {
if (ack)
{
System.out.println("交換機(jī)收到消息成功:" + correlationData.getId());
}else {
System.out.println("交換機(jī)收到消息失?。? + correlationData.getId() + "原因:" + message);
}
}
// 當(dāng)routingkey不存在的時候,會觸發(fā)該方法
/**
*
* @param message 消息主體
* @param code 錯誤碼
* @param text 錯誤消息
* @param exchange 推送該消息的交換機(jī)
* @param routingkey 推送消息時的routingkey
*/
@Override
public void returnedMessage(Message message, int code, String text, String exchange, String routingkey) {
System.out.println("交換機(jī)推送消息到隊列失敗,推送的消息是:" + new String(message.getBody()) + "錯誤原因:" + text);
}
}
生產(chǎn)者:
// 這個是正常發(fā)送到交換機(jī)的,但是路由建的名稱不存在
rabbitTemplate.convertAndSend("ljl-ConfrimTopic","messageConfirmAnomaly",JSONObject.toJSONString(mapExchange),correlationData);
?運(yùn)行代碼看效果:
?
3.備用隊列
????????當(dāng)消息不可路由的時候,mq會觸發(fā)returncallback接口的回調(diào)方法,把不可路由的消息回調(diào)回來,但是這有個問題,就是消息雖然回調(diào)過來了,但是并沒有消費(fèi)者去把不可路由的消息給消費(fèi)掉,所以這個時候就要加一個備用隊列和一個報警隊列,報警隊列的作用是用來通知管理員,有什么消息被回退了....然后備用隊列是把消息給保存起來,需要的時候就從備用隊列中取數(shù)據(jù)出來使用
? ? ? ? 注意:當(dāng)我們設(shè)置了備用隊列的時候,returncallback接口的回調(diào)方法將不會被觸發(fā),但是當(dāng)消息不可路由,而且備用隊列也不能使用的時候,才會觸發(fā)returncallback接口的回調(diào)方法,也就是說,觸發(fā)回調(diào)方法在最終條件是消息無法被任何一個隊列接受,在mq丟棄前才會觸發(fā)回調(diào)方法
配置類(加入備用交換機(jī),備用隊列,報警隊列,然后使用的是扇形交換機(jī)):
alternate-exchange 參數(shù):設(shè)置備用交換機(jī),當(dāng)消息不可路由的時候就會把消息推送到該交換機(jī)上
@Configuration
public class messageConfrimConfig {
@Bean
public DirectExchange getConfrimTopic()
{
// 創(chuàng)建一個直接交換機(jī)
// return ExchangeBuilder.directExchange("ljl-ConfrimTopic").build();
// alternate-exchange 參數(shù):設(shè)置備用交換機(jī),當(dāng)消息不可路由的時候就會把消息推送到該交換機(jī)上
return ExchangeBuilder.directExchange("ljl-ConfrimTopic").withArgument("alternate-exchange","ljl-standbyFanoutExchange").build();
}
@Bean
public Queue getConfrimQueue()
{
return new Queue("ljl-ConfrimQueue");
}
@Bean
public Binding TopicConfrimBinding()
{
return BindingBuilder.bind(getConfrimQueue()).to(getConfrimTopic()).with("messageConfirm");
}
// 備用交換機(jī),備用隊列,報警隊列
@Bean
public FanoutExchange standbyFanoutExchange()
{
// 備用交換機(jī)
return new FanoutExchange("ljl-standbyFanoutExchange");
}
@Bean
public Queue getstandbyQueue()
{
// 備用隊列
return new Queue("ljl-standbyQueue");
}
@Bean
public Queue getalarmQueue()
{
// 報警隊列
return new Queue("ljl-alarmQueue");
}
// 設(shè)置備用隊列和備用交換機(jī)的綁定關(guān)系
@Bean
public Binding standbyExchagneBinding()
{
return BindingBuilder.bind(getstandbyQueue()).to(standbyFanoutExchange());
}
// 設(shè)置報警隊列和備用交換機(jī)的綁定關(guān)系
@Bean
public Binding alarmExchagneBinding()
{
return BindingBuilder.bind(getalarmQueue()).to(standbyFanoutExchange());
}
}
在消費(fèi)者上:
?
@Component
public class clientConfirm {
@RabbitListener(queues = "ljl-ConfrimQueue")
@RabbitHandler
public void ConfrimQueue(Message message) {
System.out.println("正常隊列正常接收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
}
@RabbitListener(queues = "ljl-alarmQueue")
@RabbitHandler
public void alarmQueue(Message message) {
System.out.println("報警隊列接收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
}
}
執(zhí)行代碼看效果(此時的生產(chǎn)者發(fā)送給mq的路由鍵還是不存在的):
?這個時候會發(fā)現(xiàn)我們設(shè)置的備用交換機(jī)沒有起到效果,這是因為我們在修改參數(shù)的時候
在mq中并沒有起到效果,在是因為原本‘ljl-ConfrimTopic' 交換機(jī)已經(jīng)存在,寫的參數(shù)并不會覆蓋之前的,我們需要把這個交換機(jī)給刪掉,然后再執(zhí)行一起看下效果:
?
報警交換機(jī)的作用生效了,不可路由的時候不會觸發(fā)?returncallback接口的回調(diào)方
?
3.消費(fèi)者手動ack
在配置文件中加入:
#開啟手動ack spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual#設(shè)置消費(fèi)者每一次拉取的條數(shù) spring.rabbitmq.listener.simple.prefetch= 5
消費(fèi)者:
?在消費(fèi)者的方法上,加上這個類(Channel channel),然后這個類有幾個方法:
1.消費(fèi)者正常消費(fèi)完成該消息,手動返回ack,然后隊列把消息移除掉:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 參數(shù)1:message.getMessageProperties().getDeliveryTag()表示的是這條消息在隊列中的一個標(biāo)志,刪除的時候也是根據(jù)這個標(biāo)志來進(jìn)行刪除 參數(shù)2:是否要批量確認(rèn),這個意思是:是否把小于等于message.getMessageProperties().getDeliveryTag()值的消息批量確認(rèn)
2.消費(fèi)者在消費(fèi)消息的過程中,出現(xiàn)了異常,那么就可以使用channel.basicNack方法
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); 參數(shù)1:標(biāo)志 參數(shù)2:是否批量.true:將一次性拒絕所有小于deliveryTag的消息。 參數(shù)3:是否重新進(jìn)入隊列? 出現(xiàn)異常的時候,可以的用參數(shù)3來指定該條消息是否重新入隊,然后參數(shù)2來控制這個操作是否批量操作
對于手動ack以及消息阻塞的一些總結(jié):
? ? ? ? 假設(shè)生產(chǎn)者發(fā)送了一百條消息
????????現(xiàn)在只有一個消費(fèi)者,然后設(shè)置消費(fèi)者每一次拉取10條消息來消費(fèi)(默認(rèn)好像200多條),這個時候的正常流程就是消費(fèi)者拉取一批消息,然后正常消費(fèi),通過返回ack,接著拉取消息來進(jìn)行下一批消費(fèi),假如出現(xiàn)異常那就需要使用basicNack方法來判斷是否要重新入隊,但是異常消息入隊后,被消費(fèi)者重新消費(fèi),還是會出現(xiàn)異常,這個時候就會一直循環(huán),造成消息堆積
????????兩個消費(fèi)者:假設(shè)其中一個消費(fèi)者A可以正常消費(fèi)消息并正常返回ack,而另外一個消費(fèi)者B會中會出現(xiàn)異常,使用basicNack方法讓消息重新入隊,然后重新入隊的消息有可能會被消費(fèi)者A獲取,然后正常消費(fèi)并正常手動返回ack
? ? ? ? 面試題:如何rabbitmq確保消息不丟失/消息的可靠性
? ? ? ? 在生產(chǎn)者生成消息的時候,去開啟confirm模式,寫一個容器類去實行confirmcallback接口,這樣交換機(jī)是否成功收到消息都會觸發(fā)回調(diào)方法,然后在聲明交換機(jī),聲明隊列,以及發(fā)送消息的時候,做持久化處理,然后開啟消息回退模式,寫一個容器類去實現(xiàn)returncallback接口,這樣當(dāng)交換機(jī)推送消息給隊列時,如果失敗會觸發(fā)回調(diào)方法,在消費(fèi)者這邊,開啟手動ack模式,確保消息正常執(zhí)行完畢,然后還可以去配置備用隊列跟死信隊列,這樣就可以基本上確保mq的消息不會丟失了? ? ? ?
? ? ? ? 以上就是總體的解答思路,大家用自己的話來總結(jié)就行文章來源:http://www.zghlxwxcb.cn/news/detail-484008.html
?
到了這里,關(guān)于springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動返回ack,設(shè)置備用隊列,以及面試題:rabbitmq確保消息不丟失的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!