目錄
1.如何保證消息的可靠性
1.1.消息的可靠投遞
confirm機制
return機制
1.2.如何保證消息在隊列中不丟失
1.3.確保消息能可靠的被消費掉
2.延遲隊列
2.1.TTL
2.2.死信隊列
2.3.延遲隊列
3.如何防止消費者重復(fù)消費消息
1.如何保證消息的可靠性
1.1.消息的可靠投遞
在生產(chǎn)環(huán)境中由于一些不明原因,導(dǎo)致 rabbitmq 重啟,在 RabbitMQ 重啟期間生產(chǎn)者消息投遞失敗,導(dǎo)致消息丟失,需要手動處理和恢復(fù)。于是,我們開始思考,如何才能進行 RabbitMQ 的消息可靠投遞呢?特別是在這樣比較極端的情況,RabbitMQ 集群不可用的時候,無法投遞的消息該如何處理呢?
在使用 RabbitMQ 的時候,作為消息發(fā)送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩種方式用來控制消息的投遞可靠性模式。
- confirm 確認模式
- return 退回模式
- 消息從 producer 到 exchange 則會返回一個 confirmCallback 。
- 消息從 exchange-->queue 投遞失敗則會返回一個 returnCallback 。
默認rabbitmq不開啟上面兩種模式
我們將利用這兩個 callback 控制消息的可靠性投遞
confirm和return的實現(xiàn) ?
設(shè)置ConnectionFactory的publisher-confirm-type: correlated開啟 確認模式。
使用rabbitTemplate.setConfirmCallback設(shè)置回調(diào)函數(shù)。當消息發(fā)送到exchange后回調(diào)confirm方法。在方法中判斷ack,如果為true,則發(fā)送成功,如果為false,則發(fā)送失敗,需要處理。
設(shè)置ConnectionFactory的publisher-returns="true" 開啟 退回模式。
使用rabbitTemplate.setReturnCallback設(shè)置退回函數(shù),當消息從exchange路由到queue失敗后執(zhí)行回調(diào)函數(shù)returnedMessage。
confirm機制
演示:4.springboot整合RabbitMQ
(1).配置文件中開啟confirm
#開啟confirm確認機制 spring.rabbitmq.publisher-confirm-type=correlated
(2)設(shè)置rabbitTemplate的confirmCallback回調(diào)函數(shù)
/** * 使用confirm機制: * (1)需要開啟confirm機制。-----配置文件中加入:spring.rabbitmq.publisher-confirm-type=correlated * (2)為rabbitTemplate指定setConfirmCallback回調(diào)函數(shù) */ @Test void test001() { //只能保證消息從生產(chǎn)者到交換機的可靠性 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //觸發(fā)該方法 if (ack == false) { System.out.println("未來根據(jù)項目的需要,完成相應(yīng)的操作"); } } }); rabbitTemplate.convertAndSend("Topics-exchange", "lazy.aaa", "Hello RabbitMQ..."); }
return機制
(1).配置文件中開啟return
#開啟return機制 spring.rabbitmq.publisher-returns=true
(2)設(shè)置rabbitTemplate的return回調(diào)函數(shù)
/** * return機制: * (1)開啟return機制---配置文件加入:spring.rabbitmq.publisher-returns=true * (2)為rabbitTemplate設(shè)置return的回調(diào)函數(shù) */ @Test void test002() { //只有當消息無法從交換機到隊列時才會觸發(fā) rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //為交換機到隊列分發(fā)消息失敗時會觸發(fā) System.out.println("replyCode====" + replyCode); System.out.println("replyText====" + replyText); } }); rabbitTemplate.convertAndSend("Topics-exchange", "lazy.aaa", "Hello RabbitMQ..."); }
1.2.如何保證消息在隊列中不丟失
(1)設(shè)置隊列為持久化
(2)設(shè)置消息的持久化
1.3.確保消息能可靠的被消費掉
ACK確認機制
多個消費者同時收取消息,收取消息到一半,突然某個消費者掛掉,要保證此條消息不丟失,就需要acknowledgement機制,就是消費者消費完要通知服務(wù)端,服務(wù)端才將數(shù)據(jù)刪除
這樣就解決了,即使一個消費者出了問題,沒有同步消息給服務(wù)端,還有其他的消費端去消費,保證了消息不丟的case。
ACK的實現(xiàn)
ack指Acknowledge,確認。 表示消費端收到消息后的確認方式。
有三種確認方式:
- 自動確認:acknowledge="none"
- 手動確認:acknowledge="manual"
- 根據(jù)異常情況確認:acknowledge="auto"(這種方式使用麻煩,并且不常用)
其中自動確認是指,當消息一旦被Consumer接收到,則自動確認收到,并將相應(yīng) message 從 RabbitMQ 的消息隊列中移除。但是在實際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會丟失。
如果設(shè)置了手動確認方式,則需要在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),手動簽收,如果出現(xiàn)異常,則調(diào)用channel.basicNack()方法,讓其自動重新發(fā)送消息。
消費端:
(1).修改消費端----手動確認消息
#修改消費端----手動確認消息 spring.rabbitmq.listener.simple.acknowledge-mode=manual
(2).修改代碼
??
package com.wqg.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import org.springframework.amqp.core.Message; import java.io.IOException; /** * @ fileName:MyListener * @ description: * @ author:wqg * @ createTime:2023/7/12 18:57 */ @Component public class MyListener { //basicAck:確認消息----rabbit服務(wù)端刪除 //basicNack:服務(wù)繼續(xù)發(fā)送消息 @RabbitListener(queues = {"Topics-queue002"})//queues:表示你監(jiān)聽的隊列名 public void h(Message message , Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); //把監(jiān)聽到的消息封裝到Message類對象中 byte[] body = message.getBody(); String s = new String(body); System.out.println("消息內(nèi)容==="+s); try { //int a = 10/0; //模擬宕機 System.out.println("核心業(yè)務(wù)的處理~~~"); /** *long deliveryTag: 消息的標注 * boolean multiple:是否把該消息之前未確認的消息一起確認掉 */ channel.basicAck(deliveryTag,true);//確認消息 } catch (Exception e) { /** * long deliveryTag; boolean multiple; * boolean requeue:是否要求rabbitmq服務(wù)重新發(fā)送該消息 */ channel.basicNack(deliveryTag,true,true); } } }
總結(jié): 如何保證消息的可靠性?
- 保證消息的可靠性投遞: confirm機制和return機制
- 隊列中:---持久化
- 使用ack機制保證消費者的可靠性消費。
2.延遲隊列
2.1.TTL
TTL 全稱 Time To Live(存活時間/過期時間)。
當消息到達存活時間后,還沒有被消費,會被自動清除。
RabbitMQ可以對消息設(shè)置過期時間,也可以對整個隊列(Queue)設(shè)置過期時間。
演示:
使用圖形化界面創(chuàng)建
?生產(chǎn)者測試
?
?結(jié)果
?
隊列設(shè)置了過期時間而消息也設(shè)置了過期時間-----按照時間短的執(zhí)行?
小結(jié):
- 設(shè)置隊列過期時間使用參數(shù):x-message-ttl,單位:ms(毫秒),會對整個隊列消息統(tǒng)一過期。
- 設(shè)置消息過期時間使用參數(shù):expiration。單位:ms(毫秒),當該消息在隊列頭部時(消費時),會單獨判斷這一消息是否過期。
- 如果兩者都進行了設(shè)置,以時間短的為準。
2.2.死信隊列
死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機),當消息成為Dead message后,可以被重新發(fā)送到另一個交換機,這個交換機就是DLX。
什么樣的消息會成為死信消息:
- 隊列消息長度到達限制
- 消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標隊列,requeue=false
- 原隊列存在消息過期設(shè)置,消息到達超時時間未被消費
隊列綁定死信交換機:
演示:
使用圖形化界面創(chuàng)建:
?
?
?
?生產(chǎn)者測試
?結(jié)果
?
2.3.延遲隊列
延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。
需求:
下單后,30分鐘未支付,取消訂單,回滾庫存。
新用戶注冊成功7天后,發(fā)送短信問候。
實現(xiàn)方式:
定時器:性能差---每隔一段時間要進行數(shù)據(jù)庫查詢。
延遲隊列
通過消息隊列完成延遲隊列的功能:
- 在RabbitMQ中并未提供延遲隊列功能。
- 但是可以使用:TTL+死信隊列 組合實現(xiàn)延遲隊列的效果。
?
演示:
隊列為空
?
創(chuàng)建springboot項目
配置文件
#rabbitmq的配置 spring.rabbitmq.host=192.168.75.129 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
(1). 引入依賴
<dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
(2).創(chuàng)建OrderController.java模擬訂單
package com.wqg.controller; import com.alibaba.fastjson.JSON; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.UUID; /** * @ fileName:OrderController * @ description:訂單 * @ author:wqg * @ createTime:2023/7/13 16:24 */ @RestController @RequestMapping("/order") public class OrderController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/saveOrder") public String save(Integer pid, Integer num) { //生成一個訂單號 String orderId = UUID.randomUUID().toString().replace("-", ""); System.out.println("下單成功,訂單號為===" + orderId); HashMap<Object, Object> map = new HashMap<>(); map.put("orderId", orderId); map.put("pid", pid); map.put("num", num); rabbitTemplate.convertAndSend("pt_exchange", "qy165.aaa", JSON.toJSONString(map)); return "下單成功"; } }
(3).創(chuàng)建MyListener.java模擬監(jiān)聽
package com.wqg.listener; import com.alibaba.fastjson.JSON; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.HashMap; /** * @ fileName:MyListener * @ description: * @ author:wqg * @ createTime:2023/7/13 16:35 */ @Component public class MyListener { @RabbitListener(queues = {"dead_queue"}) public void hello(Message message){ byte[] body = message.getBody(); String s = new String(body); HashMap hashMap = JSON.parseObject(s, HashMap.class); System.out.println("message==="+hashMap); //取消訂單 System.out.println("取消訂單號為==="+hashMap.get("orderId")); } }
測試
控制臺
?
?
3.如何防止消費者重復(fù)消費消息
消息的冪等性---無論操作幾次結(jié)果都是一樣。
- 生成全局id,存入redis或者數(shù)據(jù)庫,在消費者消費消息之前,查詢一下該消息是否有消費過。
- 如果該消息已經(jīng)消費過,則告訴mq消息已經(jīng)消費,將該消息丟棄(手動ack)。
- 如果沒有消費過,將該消息進行消費并將消費記錄寫進redis或者數(shù)據(jù)庫中。
?簡單描述一下需求,如果訂單完成之后,需要為用戶累加積分,又需要保證積分不會重復(fù)累加。那么再mq消費消息之前,先去數(shù)據(jù)庫查詢該消息是否已經(jīng)消費,如果已經(jīng)消費那么直接丟棄消息。?
演示:
生產(chǎn)者文章來源:http://www.zghlxwxcb.cn/news/detail-557630.html
import com.alibaba.fastjson.JSONObject; import com.xiaojie.score.entity.Score; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.util.UUID; /** * @author * @version 1.0 * @description:發(fā)送積分消息的生產(chǎn)者 * @date */ @Component @Slf4j public class ScoreProducer implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; //定義交換機 private static final String SCORE_EXCHANGE = "ykq_score_exchaneg"; //定義路由鍵 private static final String SCORE_ROUTINNGKEY = "score.add"; /** * @description: 訂單完成 * @param: * @return: java.lang.String * @author xiaojie * @date: */ public String completeOrder() { String orderId = UUID.randomUUID().toString(); System.out.println("訂單已完成"); //發(fā)送積分通知 Score score = new Score(); score.setScore(100); score.setOrderId(orderId); String jsonMSg = JSONObject.toJSONString(score); sendScoreMsg(jsonMSg, orderId); return orderId; } /** * @description: 發(fā)送積分消息 * @param: * @param: message * @param: orderId * @return: void * @author * @date: */ @Async public void sendScoreMsg(String jsonMSg, String orderId) { this.rabbitTemplate.setConfirmCallback(this); rabbitTemplate.convertAndSend(SCORE_EXCHANGE, SCORE_ROUTINNGKEY, jsonMSg, message -> { //設(shè)置消息的id為唯一 message.getMessageProperties().setMessageId(orderId); return message; }); } @Override public void confirm(CorrelationData correlationData, boolean ack, String s) { if (ack) { log.info(">>>>>>>>消息發(fā)送成功:correlationData:{},ack:{},s:{}", correlationData, ack, s); } else { log.info(">>>>>>>消息發(fā)送失敗{}", ack); } } }
消費者文章來源地址http://www.zghlxwxcb.cn/news/detail-557630.html
import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import com.xiaojie.score.entity.Score; import com.xiaojie.score.mapper.ScoreMapper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; /** * @author * @version 1.0 * @description: 積分的消費者 * @date */ @Component @Slf4j public class ScoreConsumer { @Autowired private ScoreMapper scoreMapper; @RabbitListener(queues = {"ykq_score_queue"}) public void onMessage(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException { String orderId = message.getMessageProperties().getMessageId(); if (StringUtils.isBlank(orderId)) { return; } log.info(">>>>>>>>消息id是:{}", orderId); String msg = new String(message.getBody()); Score score = JSONObject.parseObject(msg, Score.class); if (score == null) { return; } //執(zhí)行前去數(shù)據(jù)庫查詢,是否存在該數(shù)據(jù),存在說明已經(jīng)消費成功,不存在就去添加數(shù)據(jù),添加成功丟棄消息 Score dbScore = scoreMapper.selectByOrderId(orderId); if (dbScore != null) { //證明已經(jīng)消費消息,告訴mq已經(jīng)消費,丟棄消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); return; } Integer result = scoreMapper.save(score); if (result > 0) { //積分已經(jīng)累加,刪除消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); return; } else { log.info("消費失敗,采取相應(yīng)的人工補償"); } } }
到了這里,關(guān)于如何保證消息的可靠性+延遲隊列(TTL+死信隊列+延遲隊列)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!