1.RabbitMQ的消息可靠性投遞
- 什么是消息的可靠性投遞
- 保證消息百分百發(fā)送到消息隊列中去
- 保證MQ節(jié)點(diǎn)成功接收消息
- 消息發(fā)送端需要接收到MQ服務(wù)端接收到消息的確認(rèn)應(yīng)答
- 完善的消息補(bǔ)償機(jī)制,發(fā)送失敗的消息可以再感知并二次處理
- RabbitMQ消息投遞路徑
- 生產(chǎn)者–>交換機(jī)–>隊列–>消費(fèi)者
- 通過兩個節(jié)點(diǎn)控制消息的可靠性投遞
- 生產(chǎn)者到交換機(jī):通過confirmCallback
- 交換機(jī)到隊列:通過returnCallback
- 建議
- 開啟消息確認(rèn)機(jī)制以后,保證了消息的準(zhǔn)確送達(dá),但由于頻繁的確認(rèn)交互,RabbitMQ整體效率變低,吞吐量下降嚴(yán)重,不是很重要的消息不建議使用消息確認(rèn)機(jī)制
2.RabbitMQ消息可靠性投遞confirmCallback實(shí)戰(zhàn)
-
生產(chǎn)者到交換機(jī)
- 通過confirmCallback
- 生產(chǎn)者投遞消息后,如果Broker收到消息后,會給生產(chǎn)者一個ACK。生產(chǎn)者通過ACK可以確認(rèn)這條消息是否正常發(fā)送到Broker,這種方式是消息可靠性投遞的核心
-
開啟confirmCallback配置
spring.rabbitmq.publisher-confirm-type=correlated
-
消息發(fā)送測試
package com.gen; import com.gen.config.RabbitMQConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class GenRabbitmqApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void testConfirmCallback() { this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { System.out.println(correlationData); System.out.println(cause); if (ack) { System.out.println("發(fā)送成功"); } else { System.out.println("發(fā)送失敗"); } }); this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "您有新訂單?。?!"); // 模擬消息投遞失敗 // this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME+"1", "order.new", "您有新訂單?。?!"); } }
3.RabbitMQ消息可靠性投遞returnCallback實(shí)戰(zhàn)
-
交換機(jī)到隊列
- 通過returnCallback
- 消息從交換機(jī)發(fā)送到對應(yīng)隊列失敗時觸發(fā)
- 兩種模式
- 交換機(jī)到隊列不成功,則丟棄消息(默認(rèn))
- 交換機(jī)到隊列不成功,返回給消息生產(chǎn)者,觸發(fā)returnCallback
-
配置文件開啟配置
# 開啟returnCallback配置 spring.rabbitmq.publisher-returns=true # 修改交換機(jī)投遞到隊列失敗的策略,true交換機(jī)處理消息到路由失敗會返回給生產(chǎn)者 spring.rabbitmq.template.mandatory=true
-
消息發(fā)送測試
package com.gen; import com.gen.config.RabbitMQConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class GenRabbitmqApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void testReturnCallback() { this.rabbitTemplate.setReturnsCallback((returned) -> { System.out.println(returned); }); this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "您有新訂單?。?!"); // 模擬消息轉(zhuǎn)發(fā)隊列失敗 // this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "gen.order.new", "您有新訂單!!!"); } }
4.RabbitMQ消息確認(rèn)機(jī)制ACK
- 背景:消費(fèi)者從Broker中監(jiān)聽消息,需要確保消息被合理處理
-
RabbitMQ的ACK介紹
- 消費(fèi)者從RabbitMQ收到消息并處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋后才將此消息從隊列中刪除
- 消費(fèi)者在處理消息出現(xiàn)了網(wǎng)絡(luò)不穩(wěn)定、服務(wù)器異常等現(xiàn)象,那么就不會有ACK反饋,RabbitMQ會認(rèn)為這個消息沒有正常消費(fèi),會將消息重新放入隊列中
- 只有當(dāng)消費(fèi)者正確發(fā)送ACK反饋,RabbitMQ確認(rèn)收到后,消息才會從RabbitMQ服務(wù)器的數(shù)據(jù)中刪除
- 消息的ACK確認(rèn)機(jī)制默認(rèn)是打開的,消息如未被進(jìn)行ACK的消息確認(rèn)機(jī)制,這條消息被鎖定Unacked
-
確認(rèn)方式
- 自動確認(rèn)(默認(rèn))
- 手動確認(rèn)manual
-
配置文件開啟手動確認(rèn)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
-
消費(fèi)者代碼
package com.gen.listener; import com.gen.config.RabbitMQConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME) public class OrderMQListener { @RabbitHandler public void orderConsumer(String msg, Message message, Channel channel) throws IOException { System.out.println(msg); System.out.println(message); System.out.println(channel); long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println(deliveryTag); // 成功確認(rèn),消費(fèi)成功 channel.basicAck(deliveryTag, false); // 拒絕后重新入隊 // channel.basicNack(deliveryTag, false,true); } }
-
deliveryTag介紹:表示消息投遞序號,每次消費(fèi)消息或者消息重新投遞后,deliveryTag都會增加
-
basicNack和basicReject介紹文章來源:http://www.zghlxwxcb.cn/news/detail-830590.html
- basicReject一次只能拒絕接收一個消息,可以設(shè)置是否重新入隊requeue
- basicNack方法可以支持一次0個或者多個消息的拒收,可以設(shè)置是否重新入隊requeue
-
人工審核異常消息文章來源地址http://www.zghlxwxcb.cn/news/detail-830590.html
- 設(shè)置重試閾值,超過后確認(rèn)消費(fèi)成功,記錄消息,人工處理
到了這里,關(guān)于RabbitMQ消息可靠性投遞與ACK確認(rèn)機(jī)制的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!