??本博客為個人學習筆記,學習網(wǎng)站:2023黑馬程序員RabbitMQ入門到實戰(zhàn)教程?高級篇章節(jié)
目錄
生產(chǎn)者可靠性
生產(chǎn)者重連機制
生產(chǎn)者確認機制
介紹
實現(xiàn)
總結(jié)與建議
MQ可靠性
數(shù)據(jù)持久化?
LazyQueue
消費者可靠性
消費者確認機制
失敗重試機制
失敗重試策略?
業(yè)務冪等性
何為冪等
唯一id
業(yè)務判斷
兜底方案
總結(jié)
生產(chǎn)者可靠性
生產(chǎn)者重連機制
注意:生產(chǎn)者重連機制只有在連接MQ失敗的時候進行重連,消息發(fā)送失敗時并不會重連
配置代碼如下:
spring:
rabbitmq:
connection-timeout: 1s # 設置MQ的連接超時時間
template:
retry:
enabled: true # 開啟超時重試機制
initial-interval: 1000ms # 失敗后的初始等待時間
multiplier: 1 # 失敗后下次的等待時長倍數(shù),下次等待時長 = initial-interval * multiplier
max-attempts: 3 # 最大重試次數(shù)
測試:?
利用docker命令停掉RabbitMQ服務,再進行消息發(fā)送測試。結(jié)果如下圖所示,總共重試了3次,證明消息發(fā)送的超時重試機制配置成功。
生產(chǎn)者確認機制
介紹
其中ack和nack屬于Publisher Confirm機制,ack是投遞成功;nack是投遞失敗。而return(消息到達交換機,但是路由失敗,如:匹配不到routingKey)則屬于Publisher Return機制。
其中,Publisher Confirm又分為兩種類型,一種是同步阻塞等待MQ的回執(zhí),另一種是MQ異步回調(diào)返回回執(zhí),即同步與異步兩種。
默認兩種機制都是關閉狀態(tài),需要通過配置文件來開啟。?
實現(xiàn)
實現(xiàn)生產(chǎn)者確認機制:
步驟1:在publisher模塊的application.yaml中添加配置
spring:
rabbitmq:
publisher-confirm-type: correlated # 開啟publisher confirm機制,并設置confirm類型
publisher-returns: true # 開啟publisher return機制
這里publisher-confirm-type有三種模式可選:
none:關閉confirm機制
simple:同步阻塞等待MQ的回執(zhí)
correlated:MQ異步回調(diào)返回回執(zhí)
一般我們推薦使用correlated,回調(diào)機制。?
步驟2:編寫回調(diào)函數(shù),定義ReturnCallback
每個RabbitTemplate只能配置一個ReturnCallback,因此我們可以在配置類中統(tǒng)一設置。我們在publisher模塊定義一個配置類:
package com.itheima.publisher.config;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("觸發(fā)return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
步驟三:定義ConfirmCallback?
由于每個消息發(fā)送時的處理邏輯不一定相同,因此ConfirmCallback需要在每次發(fā)消息時定義。具體來說,是在調(diào)用RabbitTemplate中的convertAndSend方法時,多傳遞一個參數(shù):
這里的CorrelationData中包含兩個核心的東西:
????????id:消息的唯一標示,MQ對不同的消息的回執(zhí)以此做判斷,避免混淆
????????SettableListenableFuture:回執(zhí)結(jié)果的Future對象
將來MQ的回執(zhí)就會通過這個Future來返回,我們可以提前給CorrelationData中的Future添加回調(diào)函數(shù)來處理消息回執(zhí):
我們新建一個測試,向系統(tǒng)自帶的交換機發(fā)送消息,并且添加ConfirmCallback:
@Test
void testPublisherConfirm() {
// 1.創(chuàng)建CorrelationData
CorrelationData cd = new CorrelationData();
// 2.給Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future發(fā)生異常時的處理邏輯,基本不會觸發(fā)
log.error("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回執(zhí)的處理邏輯,參數(shù)中的result就是回執(zhí)內(nèi)容
if(result.isAck()){ // result.isAck(),boolean類型,true代表ack回執(zhí),false 代表 nack回執(zhí)
log.debug("發(fā)送消息成功,收到 ack!");
}else{ // result.getReason(),String類型,返回nack時的異常描述
log.error("發(fā)送消息失敗,收到 nack, reason : {}", result.getReason());
}
}
});
// 3.發(fā)送消息
rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
成功接收消息結(jié)果:
把routingKey故意寫錯,路由失敗結(jié)果:?
可以看到,由于傳遞的RoutingKey是錯誤的,路由失敗后,觸發(fā)了return callback,同時也收到了ack。 當我們修改為正確的RoutingKey以后,就不會觸發(fā)return callback了,只收到ack。 而如果連交換機都是錯誤的,則只會收到nack。
交換機名填錯結(jié)果:
總結(jié)與建議
總結(jié)
開啟生產(chǎn)者確認比較消耗MQ性能,一般不建議開啟。
而且大家思考一下觸發(fā)確認的幾種情況:
1. 路由失敗:一般是因為RoutingKey錯誤導致,往往是編程導致
2. 交換機名稱錯誤:同樣是編程錯誤導致
3. MQ內(nèi)部故障:這種需要處理,但概率往往較低。因此只有對消息可靠性要求非常高的業(yè)務才需要開啟,而且僅僅需要開啟ConfirmCallback處理nack就可以了
建議
1. 生產(chǎn)者確認需要額外的網(wǎng)絡和系統(tǒng)資源開銷,盡量不要使用
2.?如果一定要使用,無需開啟Publisher-Return機制,因為一般路由失敗是自己業(yè)務問題
3. 對于nack消息可以有限次數(shù)重試,依然失敗則記錄異常消息
MQ可靠性
消息到達MQ以后,如果MQ不能及時保存,也會導致消息丟失,所以MQ的可靠性也非常重要。
數(shù)據(jù)持久化?
為了提升性能,默認情況下MQ的數(shù)據(jù)都是在內(nèi)存存儲的臨時數(shù)據(jù),重啟后就會消失。為了保證數(shù)據(jù)的可靠性,必須配置數(shù)據(jù)持久化,包括:交換機持久化、隊列持久化、消息持久化。
而spring在聲明創(chuàng)建交換機和隊列的時候,會默認設置為持久化模式,在發(fā)送消息時,也會默認將消息持久化。
在MQ管理平臺新增交換機或隊列的時候可以選擇持久化模式,如果在MQ選擇發(fā)送一條持久化消息時,需要配置properties值為2。
注意
在開啟持久化機制以后,如果同時還開啟了生產(chǎn)者確認,那么MQ會在消息持久化以后才發(fā)送ACK回執(zhí),進一步確保消息的可靠性。
不過出于性能考慮,為了減少IO次數(shù),發(fā)送到MQ的消息并不是逐條持久化到數(shù)據(jù)庫的,而是每隔一段時間批量持久化。一般間隔在100毫秒左右,這就會導致ACK有一定的延遲,因此建議生產(chǎn)者確認全部采用異步方式。
LazyQueue
在默認情況下,RabbitMQ會將接收到的信息保存在內(nèi)存中以降低消息收發(fā)的延遲。但在某些特殊情況下,這會導致消息積壓,比如:
1. 消費者宕機或出現(xiàn)網(wǎng)絡故障
2. 消息發(fā)送量激增,超過了消費者處理速度
3. 消費者處理業(yè)務發(fā)生阻塞
一旦出現(xiàn)消息堆積問題,RabbitMQ的內(nèi)存占用就會越來越高,直到觸發(fā)內(nèi)存預警上限。此時RabbitMQ會將內(nèi)存消息刷到磁盤上,這個行為成為PageOut. PageOut會耗費一段時間,并且會阻塞隊列進程。因此在這個過程中RabbitMQ不會再處理新的消息,生產(chǎn)者的所有請求都會被阻塞。
為了解決這個問題,從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的模式,也就是惰性隊列。惰性隊列的特征如下:
1. 接收到消息后直接存入磁盤而非內(nèi)存
2. 消費者要消費消息時才會從磁盤中讀取并加載到內(nèi)存(也就是懶加載)
3. 支持數(shù)百萬條的消息存儲
而在3.12版本之后,LazyQueue已經(jīng)成為所有隊列的默認格式。因此官方推薦升級MQ為3.12版本或者所有隊列都設置為LazyQueue模式。
下面介紹一下LazyQueue模式的設置:
在MQ管理平臺新增隊列時,設置為LazQueue模式:
代碼配置隊列為LazQueue模式:
1. 基于Bean聲明隊列的設置方式
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 開啟Lazy模式
.build();
}
2. 基于注解聲明隊列的設置方式
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}
3.?對于已經(jīng)存在的隊列,也可以配置為lazy模式,但是要通過設置policy實現(xiàn)。 可以基于命令行設置policy
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
消費者可靠性
消費者確認機制
一般reject方式用的較少,除非是消息格式有問題,那就是開發(fā)問題了。因此大多數(shù)情況下我們需要將消息處理的代碼通過try catch機制捕獲,消息處理成功時返回ack,處理失敗時返回nack。
由于消息回執(zhí)的處理代碼比較統(tǒng)一,因此SpringAMQP幫我們實現(xiàn)了消息確認。并允許我們通過配置文件設置ACK處理方式,有三種模式:
none:不處理。即消息投遞給消費者后立刻ack,消息會立刻從MQ刪除。非常不安全,不建議使用
manual:手動模式。需要自己在業(yè)務代碼中調(diào)用api,發(fā)送ack或reject,存在業(yè)務入侵,但更靈活
auto:自動模式。
SpringAMQP利用AOP對我們的消息處理邏輯做了環(huán)繞增強,當業(yè)務正常執(zhí)行時則自動返回ack。當業(yè)務出現(xiàn)異常時,根據(jù)異常判斷返回不同結(jié)果:
如果是業(yè)務異常,會自動返回nack;
如果是消息處理或校驗異常,自動返回reject;
返回Reject的常見異常有:
Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:
o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.
o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a @RabbitListener method.
o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, @Valid) is used in the listener and the validation fails.
o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as Message<Foo> but Message<Bar> is received.
java.lang.NoSuchMethodException: Added in version 1.6.3.
java.lang.ClassCastException: Added in version 1.6.3.
通過下面的配置可以修改SpringAMQP的ACK處理方式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做處理
測試
修改consumer服務的SpringRabbitListener類中的方法,模擬一個消息處理的異常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消費者接收到消息:【" + msg + "】");
if (true) {
throw new MessageConversionException("故意的");
//throw new RuntimeException("故意的");
}
log.info("消息處理完成");
}
測試可以發(fā)現(xiàn):當消息處理發(fā)生異常時,消息依然被RabbitMQ刪除了。
我們把確認機制修改為auto,放行以后,由于拋出的是消息轉(zhuǎn)換異常,因此Spring會自動返回reject,所以消息依然會被刪除。
我們重新將拋出異常改為RuntimeException類型,然后再次發(fā)送消息測試, 由于拋出的是業(yè)務異常,所以Spring返回ack,最終消息恢復至Ready狀態(tài),并且沒有被RabbitMQ刪除。
總結(jié):
當我們把配置改為auto時,
消息處理成功:
????????自動返回ack,并將消息刪除。
消息處理失?。?br> ????????如果是業(yè)務異常,會自動返回nack,并將消息重新投遞到消費者;
????????如果是消息處理或校驗異常,自動返回reject,并將消息刪除。
失敗重試機制
配置代碼如下?
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開啟消費者失敗重試
initial-interval: 1000ms # 初識的失敗等待時長為1秒
multiplier: 1 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-interval
max-attempts: 3 # 最大重試次數(shù)
stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務中包含事務,這里改為false
重啟consumer服務,重復之前的測試??梢园l(fā)現(xiàn):
消費者在失敗后消息沒有重新回到MQ無限重新投遞,而是在本地重試了3次,本地重試3次以后,拋出了AmqpRejectAndDontRequeueException異常。查看RabbitMQ控制臺,發(fā)現(xiàn)消息被刪除了,說明最后SpringAMQP返回的是reject。
結(jié)論:
開啟本地重試時,消息處理過程中拋出異常,不會requeue到隊列,而是在消費者本地重試。
重試達到最大次數(shù)后,Spring會返回reject,消息會被丟棄?。
失敗重試策略?
在之前的測試中,本地測試達到最大重試次數(shù)后,消息會被丟棄。這在某些對于消息可靠性要求較高的業(yè)務場景下,顯然不太合適了。 因此Spring允許我們自定義重試次數(shù)耗盡后的消息處理策略,這個策略是由MessageRecovery接口來定義的,它有3個不同實現(xiàn):
1. RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認就是這種方式?
2. ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊?
3. RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機?
比較合適的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個指定的,專門存放異常消息的隊列,后續(xù)由人工集中處理。
步驟:?在consumer服務配置類中定義處理失敗消息的交換機和隊列,并定義一個RepublishMessageRecoverer,關聯(lián)隊列和交換機
package com.itheima.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
@Configuration
//設置當滿足某個條件(失敗重試策略被開啟)時配置生效↓
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
業(yè)務冪等性
何為冪等
冪等是一個數(shù)學概念,用函數(shù)表達式來描述是這樣的:f(x) = f(f(x))
,例如求絕對值函數(shù)。 在程序開發(fā)中,則是指同一個業(yè)務,執(zhí)行一次或多次對業(yè)務狀態(tài)的影響是一致的。
例如:根據(jù)id刪除數(shù)據(jù)和查詢數(shù)據(jù)操作,執(zhí)行一次或多次對業(yè)務狀態(tài)的影響是一致的。
但數(shù)據(jù)的更新往往不是冪等的,如果重復執(zhí)行可能造成不一樣的后果。
比如:取消訂單,恢復庫存的業(yè)務。如果多次恢復就會出現(xiàn)庫存重復增加的情況。
退款業(yè)務,重復退款對商家而言會有經(jīng)濟損失。
所以,我們要盡可能避免業(yè)務被重復執(zhí)行。然而在實際業(yè)務場景中,由于意外經(jīng)常會出現(xiàn)業(yè)務被重復執(zhí)行的情況,因此,我們必須想辦法保證消息處理的冪等性。
這里給出兩種方案:唯一消息ID、業(yè)務狀態(tài)判斷
唯一id
這個思路非常簡單:
每一條消息都生成一個唯一的id,與消息一起投遞給消費者。
消費者接收到消息后處理自己的業(yè)務,業(yè)務處理成功后將消息ID保存到數(shù)據(jù)庫
如果下次又收到相同消息,去數(shù)據(jù)庫查詢判斷是否存在,存在則為重復消息放棄處理。
我們該如何給消息添加唯一ID呢? 其實很簡單,SpringAMQP的MessageConverter自帶了MessageID的功能,我們只要開啟這個功能即可。 以Jackson的消息轉(zhuǎn)換器為例:
@Bean
public MessageConverter messageConverter(){
// 1.定義消息轉(zhuǎn)換器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自動創(chuàng)建消息id,用于識別不同消息,也可以在業(yè)務中基于ID判斷是否是重復消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
業(yè)務判斷
業(yè)務判斷就是基于業(yè)務本身的邏輯或狀態(tài)來判斷是否是重復的請求或消息,不同的業(yè)務場景判斷的思路也不一樣。 例如我們當前案例中,處理消息的業(yè)務邏輯是把訂單狀態(tài)從未支付修改為已支付。因此我們就可以在執(zhí)行業(yè)務時判斷訂單狀態(tài)是否是未支付,如果不是則證明訂單已經(jīng)被處理過,無需重復處理。
相比較而言,消息ID的方案需要改造原有的數(shù)據(jù)庫,所以我們更推薦使用業(yè)務判斷的方案。
以支付修改訂單的業(yè)務為例,我們需要修改OrderServiceImpl中的markOrderPaySuccess方法:
@Override
public void markOrderPaySuccess(Long orderId) {
// 1.查詢訂單
Order old = getById(orderId);
// 2.判斷訂單狀態(tài)
if (old == null || old.getStatus() != 1) {
// 訂單不存在或者訂單狀態(tài)不是1,放棄處理
return;
}
// 3.嘗試更新訂單
Order order = new Order();
order.setId(orderId);
order.setStatus(2);
order.setPayTime(LocalDateTime.now());
updateById(order);
}
上述代碼邏輯上符合了冪等判斷的需求,但是由于判斷和更新是兩步動作,因此在極小概率下可能存在線程安全問題。
我們可以合并上述操作為這樣:
@Override
public void markOrderPaySuccess(Long orderId) {
// 代碼等同于如下sql語句
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}
我們在where條件中除了判斷id以外,還加上了status必須為1的條件。如果條件不符(說明訂單已支付),則SQL匹配不到數(shù)據(jù),根本不會執(zhí)行。
兜底方案
雖然我們利用各種機制盡可能增加了消息的可靠性,但也不好說能保證消息100%的可靠。萬一真的MQ通知失敗該怎么辦呢? 有沒有其它兜底方案,能夠確保訂單的支付狀態(tài)一致呢?
其實思想很簡單:既然MQ通知不一定發(fā)送到交易服務,那么交易服務就必須自己主動去查詢支付狀態(tài)。這樣即便支付服務的MQ通知失敗,我們依然能通過主動查詢來保證訂單狀態(tài)的一致。 流程如下:
?
圖中黃色線圈起來的部分就是MQ通知失敗后的兜底處理方案,由交易服務自己主動去查詢支付狀態(tài)。
不過需要注意的是,交易服務并不知道用戶會在什么時候支付,如果查詢的時機不正確(比如查詢的時候用戶正在支付中),可能查詢到的支付狀態(tài)也不正確。 那么問題來了,我們到底該在什么時間主動查詢支付狀態(tài)呢?
這個時間是無法確定的,因此,通常我們采取的措施就是利用定時任務定期查詢,例如每隔20秒就查詢一次,并判斷支付狀態(tài)。如果發(fā)現(xiàn)訂單已經(jīng)支付,則立刻更新訂單狀態(tài)為已支付即可。文章來源:http://www.zghlxwxcb.cn/news/detail-833068.html
總結(jié)
綜上,支付服務與交易服務之間的訂單狀態(tài)一致性是如何保證的?
首先,支付服務會正在用戶支付成功以后利用MQ消息通知交易服務,完成訂單狀態(tài)同步。
其次,為了保證MQ消息的可靠性,我們采用了生產(chǎn)者確認機制、消費者確認、消費者失敗重試等策略,確保消息投遞的可靠性
最后,我們還在交易服務設置了定時任務,定期查詢訂單支付狀態(tài)。這樣即便MQ通知失敗,還可以利用定時任務作為兜底方案,確保訂單支付狀態(tài)的最終一致性。?文章來源地址http://www.zghlxwxcb.cn/news/detail-833068.html
?
到了這里,關于微服務—RabbitMQ高級(業(yè)務在各方面的可靠性)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!