提示:以下是本篇文章正文內(nèi)容,RabbitMQ 系列學習將會持續(xù)更新
官網(wǎng):https://www.rabbitmq.com
一、消息確認機制
RabbitMQ 消息確定主要分為兩部分:
- 第一種是消息發(fā)送確認。這種是用來確認生產(chǎn)者將消息發(fā)送給交換器,交換器傳遞給隊列的過程中,消息是否成功投遞。
- 確認發(fā)送的第一步是確認是否到達交換器。
- 確認發(fā)送的第二步是確認是否到達隊列。
- 第二種是消費接收確認。這種是確認消費者是否成功消費了隊列中的消息。
??1.1 消息發(fā)送確認(生產(chǎn)者)
- 消息從 producer 到 exchange 則會返回一個
confirmCallback
。 - 消息從 exchange 到 queue 投遞失敗則會返回一個
returnCallback
。
??confirm 確認模式
①開啟確認模式
spring:
rabbitmq:
addresses: 1.15.76.95
username: admin
password: 123456
virtual-host: /test
# 消息可靠傳遞: 開啟確認模式
publisher-confirm-type: correlated
publisher-confirm-type: none
:表示禁用發(fā)布確認模式,默認值,使用此模式之后,不管消息有沒有發(fā)送到 Broker 都不會觸發(fā) ConfirmCallback 回調(diào)。publisher-confirm-type: correlated
:表示消息成功到達 Broker 后觸發(fā) ConfirmCalllBack 回調(diào)。publisher-confirm-type: simple
:如果消息成功到達 Broker 后一樣會觸發(fā) ConfirmCalllBack 回調(diào),發(fā)布消息成功后使用 rabbitTemplate 調(diào)用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 節(jié)點返回發(fā)送結(jié)果,根據(jù)返回結(jié)果來判定下一步的邏輯。
注意:waitForConfirmsOrDie 方法如果返回 false 則會關(guān)閉 channel 信道,則接下來無法發(fā)送消息到 broker。
②設(shè)置 ConfirmCallback
函數(shù),然后發(fā)送消息。
@Test
void publisher1() {
// 1.定義回調(diào)
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* confirm方法參數(shù):
* @param correlationData 相關(guān)配置信息
* @param ack 交換機是否成功收到消息
* @param cause 交換機接收失敗的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("執(zhí)行confirm()方法: ");
if(ack) {
System.out.println("交換機成功收到消息!");
}else {
System.out.println("交換機沒有收到消息,失敗原因: " + cause);
}
}
});
// 2.發(fā)送消息
template.convertAndSend("amq.direct", "my-yyds", "Hello,World");
// 休眠一會兒
Thread.sleep(10000);
}
③運行測試:
a. 我們成功發(fā)送了消息,但是控制臺依然返回了 ack=false。
原因:當發(fā)送方法結(jié)束,RabbitMQ 相關(guān)的資源也就關(guān)閉了。雖然我們的消息發(fā)送出去,但異步的 ConfirmCallback 卻由于資源關(guān)閉無法返回確認信息。
b. 我們可以讓主線程休眠一會兒,等 callback 返回確認信息后再關(guān)閉資源。
回到目錄…文章來源:http://www.zghlxwxcb.cn/news/detail-409255.html
??return 回退模式
①開啟回退模式
spring:
rabbitmq:
addresses: 1.15.76.95
username: admin
password: 123456
virtual-host: /test
# 消息可靠傳遞: 開啟確認模式
publisher-confirm-type: correlated
# 消息可靠傳遞: 開啟回退模式
publisher-returns: true
②設(shè)置 Exchange 處理消息的模式:如果消息沒有路由到 Queue
- 默認方式:丟棄消息 。
- 如果設(shè)置了
rabbitTemplate.setMandatory(true)
參數(shù),則會將消息退回給 producer。并執(zhí)行回調(diào)函數(shù) returnedMessage。
③設(shè)置 ReturnCallBack
函數(shù)
@Test
void publisher2() throws InterruptedException {
// 1.設(shè)置交換機處理失敗消息的模式
template.setMandatory(true);
// 2.設(shè)置ReturnCallBack函數(shù)
template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息對象: " + returnedMessage.getMessage());
System.out.println("錯誤碼: " + returnedMessage.getReplyCode());
System.out.println("錯誤信息: " + returnedMessage.getReplyText());
System.out.println("交換機: " + returnedMessage.getExchange());
System.out.println("路由鍵: " + returnedMessage.getRoutingKey());
}
});
// 3.發(fā)送消息
template.convertAndSend("amq.direct", "my-yyds1", "Hello,World");
Thread.sleep(10000);
}
④運行測試:可以看到路由錯誤。
回到目錄…
??1.2 消息接收確認(消費者)
??none 自動確認
??acknowledge-mode=none
,默認方式,默認消費者能正確處理所有請求。
??auto 異常確認
??acknowledge-mode=auto
,根據(jù)異常情況決定處理方式。
- 如果消費者在消費的過程中沒有拋出異常,則自動確認。
- 當消費者消費的過程中拋出 AmqpRejectAndDontRequeueException 異常的時候,則消息會被拒絕,且該消息不會重回隊列。
- 當拋出 ImmediateAcknowledgeAmqpException 異常,消息會被確認。
- 如果拋出其他的異常,則消息會被拒絕,但是與前兩個不同的是,該消息會重回隊列,如果此時只有一個消費者監(jiān)聽該隊列,那么該消息重回隊列后又會推送給該消費者,會造成死循環(huán)的情況。
??manual 手動確認
- 在該模式下,消費者消費消息后需要根據(jù)消費情況給 Broker 返回一個回執(zhí),是確認
ack
使 Broker 刪除該條已消費的消息,還是失敗確認返回nack
,還是拒絕該消息。 - 開啟手動確認后,消費者接收到消息后必須返回
ack
,只有 RabbitMQ 接收到ack
后,消息才會從隊列中被刪除。如果不調(diào)用 channel.basicAck() 會出現(xiàn)該消息被重復(fù)消費的情況。
①設(shè)置手動簽收,acknowledge =manual
spring:
rabbitmq:
addresses: 1.15.76.95
username: admin
password: 123456
virtual-host: /test
listener:
# 設(shè)置監(jiān)聽器類型,如不設(shè)置將會默認為SimpleRabbitListenerContainerFactory,那么下面的direct配置不生效
type: direct
direct:
# 設(shè)置消費端手動確認
acknowledge-mode: manual
②監(jiān)聽器類實現(xiàn) ChannelAwareMessageListener
接口
- 目的是為了實現(xiàn)它的 onMessage(Message message, Channel channel) 方法,因為我們需要 channel 連接器。
- 也可以不實現(xiàn)接口,直接在定義接收方法時,引入 Channel 參數(shù)。
③如果消息成功處理,則調(diào)用 channel.basicAck()
簽收。
④如果消息處理失敗,則調(diào)用 channel.basicNack()
拒絕簽收,broker 重新發(fā)送給 consumer。
@Component
public class AckListener {
@RabbitListener(queues = "yyds")
public void onMessage(Message message, Channel channel) throws Exception {
// 先獲取當前的消息標簽
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1.接收消息
System.out.println(new String(message.getBody()));
// 2.處理業(yè)務(wù)邏輯
System.out.println("處理業(yè)務(wù)邏輯...");
//int a = 3 / 0; //模擬出錯的場景
// 3.手動簽收
channel.basicAck(deliveryTag, false);
}catch(Exception e) {
// 4.拒絕簽收
System.out.println("消息接收失敗,讓消息重回隊列");
channel.basicNack(deliveryTag, false, true);
}
}
}
消息確認的方法 | 參數(shù)一 | 參數(shù)二 | 參數(shù)三 | 描述 |
---|---|---|---|---|
channel.basicAck(deliverTag, true) |
消息標簽 | 是否接收多條消息 | ——— | 確認應(yīng)答 |
channel.basicNack(deliverTag, false, true) |
消息標簽 | 是否接收多條消息 | 是否讓消息重回隊列 | 拒絕應(yīng)答 |
channel.basicReject(deliverTag, false) |
消息標簽 | 是否讓消息重回隊列 | ——— | 拒絕消息 |
channel.basicRecover(true) |
是否恢復(fù)給其它消費者 | ——— | ——— | 恢復(fù)消息 |
⑤啟動測試:
如果接收者成功消費消息,則沒有任何返回。
如果接收者消費消息失敗,則采用 Nack 讓消息重回隊列,并且自己可以再次消費重回后的消息。
回到目錄…
二、消費端限流 (prefetch)
和單純 ACK 區(qū)別:ACK 會一次拉取所有消息,而 prefetch 可以設(shè)置每次消費的消息數(shù)量
- 用于指定消費端處理消息的速率。
- 用于保證系統(tǒng)穩(wěn)定性,削峰填谷時的處理速率。
①該功能必須設(shè)置 ACK 方式:手動確認 acknowledge-mode: manual
且 prefetch=條數(shù)
spring:
rabbitmq:
listener:
type: direct
direct:
acknowledge-mode: manual #手動確認
prefetch: 3 #消費者每次消費的數(shù)量
②我們先批量插入20條消息
@Test
void publisher() {
for (int i = 0; i < 20; i++) {
template.convertAndSend("amq.direct", "my-yyds", "Hello,World: " + i);
}
}
③設(shè)置監(jiān)聽器:消費者手動確認消息
@Component
public class QosListener {
@RabbitListener(queues = "yyds")
public void onMessage(Message message, Channel channel) throws IOException, InterruptedException {
// 1.接收消息
System.out.println("接收消息: " + new String(message.getBody()));
// 2.處理業(yè)務(wù)
System.out.println("處理業(yè)務(wù)中...");
Thread.sleep(1000);
// 3.簽收
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //設(shè)置參數(shù)可以簽收多條消息
}
}
④啟動測試:可以看到我們接收到的消息是3條/批
回到目錄…
三、設(shè)置隊列參數(shù)
??3.1 消息TTL過期
RabbitMQ 支持將超過一定時間沒被消費的消息自動刪除,這需要消息設(shè)定 TTL
值,如果消息的存活時間超過了 Time To Live
值,就會被自動刪除。如果有死信隊列,那么就會進入到死信隊列中。
①直接給隊列設(shè)定 TTL 值(毫秒為單位): 過期會刪除隊列中的所有消息。
@Bean("yydsQueue")
public Queue queue(){
return QueueBuilder
.nonDurable("yyds")
.deadLetterExchange("dlx.direct")
.deadLetterRoutingKey("dl-yyds")
.ttl(10000) //如果10秒沒處理,就自動刪除
.build();
}
現(xiàn)在我們刪除之前的 yyds 隊列再重啟測試:可以發(fā)現(xiàn) yyds 隊列已經(jīng)具有 TTL 特性了。
②生產(chǎn)者給某個消息設(shè)定過期時間: 使用 expiration
參數(shù)。只有消息在雙端時,才是計時。
@Test
void publisher() {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000"); //設(shè)置過期時間 ms
return message;
}
};
template.convertAndSend("amq.direct", "my-yyds", "Hello,TLL", messagePostProcessor);
}
現(xiàn)在刪除之前的 yyds 隊列再重啟測試:發(fā)現(xiàn)隊列并沒有 TTL 特性。
只是我們發(fā)布的消息會過期,不影響其它消息。
這種情況就不會過期:當該消息在隊列頭部時(消費時),會單獨判斷這一消息是否過期。
回到目錄…
??3.2 隊列最大長度
我們來看一下當消息隊列長度達到最大的情況,現(xiàn)在我們將消息隊列的長度進行限制:
@Bean("yydsQueue")
public Queue queue(){
return QueueBuilder
.nonDurable("yyds")
.maxLength(3) //將最大長度設(shè)定為3
.build();
}
現(xiàn)在我們重啟一下:可以發(fā)現(xiàn) yyds 隊列已經(jīng)具有 Limit 特性了。
我們向 yyds 隊列中依次插入4條消息:person-1
、person-2
、person-3
、person-4
可以看到因為長度限制為3,所以最開始的消息直接被丟棄了。
回到目錄…
四、死信隊列
使用場景:
?● 如果消息隊列中的數(shù)據(jù)遲遲沒有消費者來處理,那么就會一直占用消息隊列的空間。
?● 比如我們模擬一下?lián)屲嚻钡膱鼍埃脩粝聠胃哞F票之后,會進行搶座,然后再進行付款,但是如果用戶下單之后并沒有及時的付款,這張票不可能一直讓這個用戶占用著,因為你不買別人還要買呢,所以會在一段時間后超時,讓這張票可以繼續(xù)被其他人購買。
?● 這時,我們就可以使用死信隊列,將那些用戶超時未付款的或是用戶主動取消的訂單,進行進一步的處理。
消息成為死信的三種情況:
?● 消息被拒絕 (basic.reject
/ basic.nack
),并且 requeue = false
不重回隊列
?● 消息TTL過期
?● 隊列達到最大長度
那么如何構(gòu)建這樣的模式呢?
?實際上本質(zhì)就是一個死信交換機 + 綁定的死信隊列。當正常隊列中的消息被判定為死信時,會被發(fā)送到對應(yīng)的死信交換機,然后再通過交換機發(fā)送到死信隊列中,死信隊列也有對應(yīng)的消費者去處理消息。
??4.1 構(gòu)建死信隊列
①這里我們直接在配置類中創(chuàng)建一個新的死信交換機和死信隊列,并進行綁定:
@Configuration
public class RabbitConfiguration {
@Bean("directDlExchange") //創(chuàng)建一個新的死信交換機
public Exchange dlExchange() {
return ExchangeBuilder.directExchange("dlx.direct").build();
}
@Bean("yydsDlQueue") //創(chuàng)建一個新的死信隊列
public Queue dlQueue() {
return QueueBuilder
.nonDurable("dl-yyds") //隊列名稱
.build();
}
@Bean("dlBinding") //死信交換機和死信隊列進綁定
public Binding dlBinding(@Qualifier("directDlExchange") Exchange exchange,
@Qualifier("yydsDlQueue") Queue queue) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("dl-yyds") //自定義routingKey
.noargs();
}
@Bean("directExchange")
.......................
@Bean("yydsQueue") //在普通消息隊列中指定死信交換機
public Queue queue(){
return QueueBuilder
.nonDurable("yyds")
.deadLetterExchange("dlx.direct") //指定死信交換機
.deadLetterRoutingKey("dl-yyds") //指定死信RoutingKey
.build();
}
@Bean("binding")
................
}
②接著我們將監(jiān)聽器修改為死信隊列監(jiān)聽:
@Component
public class ConsumeListener {
@RabbitListener(queues = "dl-yyds", messageConverter = "jacksonConverter")
public void receiver(User user){
System.out.println("死信隊列監(jiān)聽器: " + user);
}
}
啟動一下:我們可以看到多了一個死信交換機。
隊列列表中也多了一個死信隊列,并且 yyds 隊列也支持死信隊列發(fā)送功能了。
回到目錄…
??4.2 模擬死信消息
場景一:消息被拒絕
現(xiàn)在我們先向 yyds 隊列中發(fā)送一個消息:Hello,World
然后我們?nèi)∠⒌臅r候拒絕消息,并且不讓消息重新排隊:
可以看到拒絕后,如果不讓消息重新排隊,那么就會直接被丟進死信隊列中:
場景二:消息TTL過期
如果消息的存活時間超過了 Time To Live
值,就會被自動刪除。如果有死信隊列,那么就會進入到死信隊列中。
現(xiàn)在我們將 yyds 消息隊列設(shè)定 TTL 值(毫秒為單位):
@Bean("yydsQueue")
public Queue queue(){
return QueueBuilder
.nonDurable("yyds")
.deadLetterExchange("dlx.direct")
.deadLetterRoutingKey("dl-yyds")
.ttl(10000) //如果10秒沒處理,就自動刪除
.build();
}
啟動測試:我們可以看到 死信隊列 和 普通隊列 (TTL
、DLX
死信交換機、DLK
死信routingKey):
我們向 yyds 隊列中插入一個新的消息:Hello,World
可以看到消息10秒鐘之后就不見了,而是被丟進了死信隊列中。
場景三:隊列達到最大長度
最后我們來看一下當消息隊列長度達到最大的情況,現(xiàn)在我們將消息隊列的長度進行限制:
@Bean("yydsQueue")
public Queue queue(){
return QueueBuilder
.nonDurable("yyds")
.deadLetterExchange("dlx.direct")
.deadLetterRoutingKey("dl-yyds")
.maxLength(3) //將最大長度設(shè)定為3
.build();
}
啟動測試:我們可以看到 死信隊列 和 普通隊列(Lim
、DLX
、DLK
):
我們向 yyds 隊列中依次插入4條消息:Message-1
、Message-2
、Message-3
、Message-4
可以看到因為長度限制為3,所以最開始的消息直接被丟進了死信隊列中了。
回到目錄…
??4.3 實現(xiàn)延遲隊列
延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。
?場景一:下單后,30分鐘未支付,取消訂單,回滾庫存。
?場景二:新用戶注冊成功7天后,發(fā)送短信問候。
我們可以使用 TTL+死信隊列
組合實現(xiàn)延遲隊列的效果。
這里就不演示 TTL+死信隊列
的組合了,和上面一樣,我們來實現(xiàn)一下消費者端吧!
@Component
public class DlxListener {
@RabbitListener(queues = "dl-yyds")
public void receiver(Message message, Channel channel) throws Exception {
try {
System.out.println("死信隊列監(jiān)聽器: " + new String(message.getBody()));
System.out.println("======== 業(yè)務(wù)處理中 ==========");
System.out.println("根據(jù)訂單id查詢狀態(tài):");
System.out.println(" 如果訂單支付成功,向物流系統(tǒng)發(fā)送發(fā)貨的請求。");
System.out.println(" 如果訂單支付失敗,取消訂單,回滾庫存。");
System.out.println("======== 確認消息簽收 ==========");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch(Exception e) {
System.out.println("出現(xiàn)異常,拒絕簽收, 讓消息重回隊列");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
啟動測試:
手動向 yyds 隊列中發(fā)一條消息:訂單:{id:1, 狀態(tài):xxx, 下單時間: yyyy-m-d HH:MM:SS}
消息過期后,成功被死信隊列監(jiān)聽到。
回到目錄…
五、消息追蹤
??5.1 Firehose: amq.rabbitmq.trace
- Firehose 的機制是將生產(chǎn)者投遞給 RabbitMQ 的消息,RabbitMQ 投遞給消費者的消息按照指定的格式發(fā)送到默認的 Exchange 上。
- 這個默認的 Exchange 的名稱為
amq.rabbitmq.trace
,它是一個 topic 類型的內(nèi)部交換機。 - 發(fā)送到該交換機上的消息的 routingKey 為
publish.exchangename
(生產(chǎn)者發(fā)布的消息) 和deliver.queuename
(消費者獲取的消息)。
應(yīng)用:amq.rabbitmq.trace 交換機實現(xiàn)消息追蹤
可以看到它也是 topic
類型的,它是一個內(nèi)部交換機,用于幫助我們記錄和追蹤生產(chǎn)者和消費者使用消息隊列的交換機。
①首先,我們需要在控制臺將虛擬主機 /test
的追蹤功能開啟:
rabbitmqctl trace_on -p /test
②創(chuàng)建一個 trace 消息隊列用于接收記錄:
③我們給 amq.rabbitmq.trace 交換機綁定上剛剛的隊列: 因為該交換機是內(nèi)部的,所以只能在 Web 管理頁面中綁定
由于發(fā)送到此交換機上的 routingKey 為 publish.交換機名稱
和 deliver.隊列名稱
,分別對應(yīng)生產(chǎn)者投遞到交換機的消息,和消費者從隊列上獲取的消息,因此這里使用 #
通配符進行綁定。
④現(xiàn)在我們來測試一下,往 yyds 隊列中發(fā)送消息: 會發(fā)現(xiàn) trace 隊列中多了2條信息。
通過追蹤,我們可以很明確地得知消息發(fā)送的交換機、routingKey、用戶等信息,包括信息本身:
同樣的,消費者在取出數(shù)據(jù)時也有記錄:我們可以明確消費者的地址、端口、具體操作的隊列以及取出的消息信息等。
回到目錄…
??5.2 rabbitmq_tracing 插件
rabbitmq_tracing 和 Firehose 在實現(xiàn)上如出一轍,只不過 rabbitmq_tracing 的方式比 Firehose 多了一
層GUI
的包裝,更容易使用和管理。
缺點:會降低 RabbitMQ 的整體性能,不適用于生產(chǎn)開發(fā)中,適用于測試和調(diào)試階段。
①啟用插件
# 查看插件列表
rabbitmq-plugins list
# 啟動插件
rabbitmq-plugins enable rabbitmq_tracing
②創(chuàng)建 Trace 追蹤隊列
同時多了一條隊列,點擊查看發(fā)現(xiàn):實際上它也是綁定了我們的 amq.rabbitmq.trace
交換機。
③測試:向 yyds 隊列中發(fā)布并消費消息,然后查看 Trace log
日志 (消息的生產(chǎn)者和消費者)
回到目錄…
六、應(yīng)用問題
??6.1 消息可靠性保障
??需求:確保消息 100% 發(fā)送成功。
??消息補償機制
??6.2 消息冪等性保障
??需求:在MQ中,對于相同的消息消費一次和消費多次,最終的結(jié)果一致。
??樂觀鎖解決方案
回到目錄…
總結(jié):
提示:這里對文章進行總結(jié):
本文是對RabbitMQ高級特性的學習,我們首先學習了消息確認機制和消費端限流的方法,又通過設(shè)置隊列的參數(shù)實現(xiàn)了死信隊列和延時隊列。后面又介紹了兩種方式實現(xiàn)消息追蹤,最后也介紹了消息可靠性和消息冪等性的解決方案。之后的學習內(nèi)容將持續(xù)更新?。?!文章來源地址http://www.zghlxwxcb.cn/news/detail-409255.html
到了這里,關(guān)于RabbitMQ之高級特性的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!