RabbitMQ 的作用
- 提供了系統(tǒng)之間的異步調(diào)用,比如一個(gè)支付功能,用戶在支付完成之后,會(huì)去數(shù)據(jù)庫(kù)中執(zhí)行后續(xù)操作,然后更新支付狀態(tài),會(huì)生成訂單信息,如果后續(xù)還需要添加功能,就需要去業(yè)務(wù)邏輯中修改代碼,這樣就會(huì)出現(xiàn)業(yè)務(wù)耦合。同時(shí)想要執(zhí)行后續(xù)操作,需要等待支付功能完成,在此等待過(guò)程中會(huì)耗費(fèi)時(shí)間,CPU空轉(zhuǎn),性能比較差。當(dāng)業(yè)務(wù)中有操作失敗,就會(huì)將全部操作回滾。如果下一個(gè)操作依賴上一個(gè)操作時(shí)就需要用到同步操作。但是后續(xù)的很多業(yè)務(wù)操作只需要知道支付成功之后就去執(zhí)行,不需要等待其他業(yè)務(wù)執(zhí)行完成之后再去執(zhí)行。同步操作的時(shí)效性強(qiáng),但是拓展性差,并且性能下降還會(huì)出現(xiàn)級(jí)聯(lián)失敗等問(wèn)題。
- 異步調(diào)用的方式就是基于消息通知的方式,其中有三個(gè)角色:消息發(fā)送者、消費(fèi)代理、消息接收者。微信消息發(fā)送、送外賣。支付服務(wù)就不在同步調(diào)用業(yè)務(wù)關(guān)聯(lián)度低的服務(wù),而是發(fā)送消息通知Broker,這樣做具有以下優(yōu)勢(shì)
- 解除耦合,拓展性強(qiáng)。
- 無(wú)需等待,性能好。
- 故障隔離:當(dāng)某一個(gè)業(yè)務(wù)接收服務(wù)宕機(jī),其他的服務(wù)可以正常執(zhí)行,這個(gè)服務(wù)重連之后只需要去MQ中去獲取數(shù)據(jù)就行。
- 緩存消息,削峰填谷作用:當(dāng)突然有大量的支付請(qǐng)求過(guò)來(lái)后,不會(huì)第一時(shí)間去沖擊數(shù)據(jù)庫(kù),而是存放在MQ中,根據(jù)業(yè)務(wù)處理的速度自己去取,業(yè)務(wù)服務(wù)壓力就很小。
- 異步調(diào)用的問(wèn)題:
- 不能立刻得到調(diào)用結(jié)果,時(shí)效性差。
- 不確定下游業(yè)務(wù)是否執(zhí)行成功。
- 業(yè)務(wù)安全依賴于Broker的可靠性。
為什么使用RabbitMQ
MQ就是MessageQueue,存放消息的隊(duì)列,也就是異步調(diào)用中的Broker。
在日常開(kāi)發(fā)過(guò)程中,常見(jiàn)的消息隊(duì)列有四種,RabbitMQ、ActiveMQ、RocketMQ、Kafka。 這四中的對(duì)比性下圖可以看到,其中RabbitMQ是Rabbit公司專門(mén)研究的,相較于其他消息中間件它支持SMTP協(xié)議,并且它的消息延遲更是達(dá)到了恐怖的微秒級(jí)。當(dāng)然它的消息可靠性以及可用性也是非常高的,所以一般項(xiàng)目開(kāi)發(fā)沒(méi)有特殊要求都是使用的是RabbitMQ。
數(shù)據(jù)隔離
交換機(jī)和隊(duì)列都有自己的VirtualHost,不同的VirtualHost都有自己不同的交換機(jī)和隊(duì)列。一個(gè)MQ中可以有多個(gè)VirtualHost,在發(fā)消息的時(shí)候去連接對(duì)應(yīng)的VirtualHost就行。每個(gè)user可以去操作自己創(chuàng)建的的VirtualHost,查看的話時(shí)根據(jù)管理員創(chuàng)建user時(shí)分配的權(quán)限決定。
SpringAMQP
<!--RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: # 主機(jī)名
port: 5672 # 端口
virtual-host: / # 虛擬主機(jī)
username: # 用戶名
password: # 密碼
加入RabbitMQ依賴,在yml文件中配置,然后通過(guò)RabbitTemple向隊(duì)列中發(fā)送消息。
work模式
默認(rèn)情況下,RabbitMQ會(huì)將消息依次輪詢投遞給綁定在隊(duì)列上的每一個(gè)消費(fèi)者。并沒(méi)有考慮到消費(fèi)者是否已經(jīng)處理完消息,這種情況可能出現(xiàn)的問(wèn)題就是當(dāng)我們不知道消費(fèi)者消費(fèi)能力的時(shí)候容易出現(xiàn)消息堆積。比如此時(shí)有兩個(gè)消費(fèi)者,消費(fèi)者a一秒鐘可以處理50條數(shù)據(jù),消費(fèi)者b一秒鐘只能處理5條數(shù)據(jù),此時(shí)有1000條消息發(fā)送到隊(duì)列中,一次輪詢綁定消費(fèi)者,每一個(gè)消費(fèi)者綁定了500個(gè)數(shù)據(jù),但是消費(fèi)者a10秒鐘就處理完成,此時(shí)消費(fèi)者b還在處理消息,a此時(shí)就空閑著,可用性比較低。
因此我們需要在yml中設(shè)置prefetch值為1,確保同一時(shí)刻最多投遞給消費(fèi)者1條消息,處理完之后才能獲取下一條消息。
Rabbitmq:
listener:
simple:
prefetch: 1
交換機(jī)
交換機(jī)主要分為三種類型:Fanout(廣播)、Direct(定向)、Topic(話題)。
- Fanout:Fanout Exchange會(huì)將接收到的消息廣播到每一個(gè)跟其綁定的queue中,所以也叫廣播模式。
- Direct:Direct Exchange會(huì)將接收到的消息根據(jù)規(guī)則路由到指定的Queue,因此稱為定向路由。
- 每一個(gè)Queue都與Exchange設(shè)置一個(gè)BindingKey。
- 發(fā)布者發(fā)布消息時(shí),指定消息的RoutingKey。
- Exchange將消息路由到BindingKey與消息BindingKey一致的隊(duì)列。
- Topic:TopicExchange與DirectExchange類似,特殊之處在于
- routingKey可以是多個(gè)單詞的列表,并且以 . 分割。
- Queue與Exchange指定BinddingKey時(shí)可以使用通配符:
- #:代指0個(gè)或多個(gè)單詞。
- *:代指一個(gè)單詞。
如何聲明隊(duì)列和交換機(jī)
1. Spring AMQP提供了幾個(gè)類,用來(lái)聲明隊(duì)列、交換機(jī)以及其綁定關(guān)系。
- Queue:用于聲明隊(duì)列,可以用工廠類QueueBuilder構(gòu)建。
- Exchange:用于聲明交換機(jī),可以用工廠類ExchangeBuilder構(gòu)建。
- Binding:用于聲明隊(duì)列和交換機(jī)的綁定關(guān)系,可以用工廠類BindingBuilder構(gòu)建。
@Bean
public FanoutExchange fanoutExchange(){
// ExchangeBuilder.fanoutExchange("").build();
return new FanoutExchange("shuqg.fanout2");
}
@Bean
public Queue fanoutQueue3(){
// QueueBuilder.durable("").build();
return new Queue("shuqg.queue3");
}
@Bean
public Binding fanoutBinging3(Queue fanoutqueue3, FanoutExchange fanoutExchange){
// 如果需要綁定bindingkey在后面.with("")
return BindingBuilder.bind(fanoutqueue3).to(fanoutExchange);
}
2. 基于注解聲明
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2", durable = "true"),
exchange = @Exchange(name = "shuqg.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg) throws InterruptedException {
System.out.println("消費(fèi)者2 收到了 direct.queue2的消息:【" +msg+ "】");
Thread.sleep(200);
}
消息轉(zhuǎn)換器
-
Spring對(duì)消息處理默認(rèn)實(shí)現(xiàn)的是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
-
但是存在以下問(wèn)題,JDK的序列化有安全風(fēng)險(xiǎn)、JDK序列化的消息太大、JDK序列化的消息可讀性差。
- 在傳輸put類型消息的時(shí)候,RabbitMQ默認(rèn)會(huì)將消息序列化轉(zhuǎn)換為字節(jié)碼,但是可讀性非常差,原本非常短的一個(gè)消息變得非常大,并且有亂碼風(fēng)險(xiǎn)。
-
建議采用JSON序列化代替默認(rèn)序列化,在SpringAMQP中有JSON的接口,只不過(guò)沒(méi)有生效,我們只需要引入JSON依賴。
<!--JSON-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
然后在publisher和consumer中都要配置MessageConverter
@Bean
public MessageConverter jacksonMessageConvertor(){
return new Jackson2JsonMessageConverter();
}
生產(chǎn)者重連
有時(shí)候由于網(wǎng)絡(luò)波動(dòng),可能會(huì)出現(xiàn)客戶端連接MQ失敗的情況。我們可以通過(guò)配置開(kāi)啟失敗后的重連機(jī)制。 當(dāng)網(wǎng)絡(luò)不穩(wěn)定時(shí),使用重試機(jī)制可以有效提高消息發(fā)送成功概率。不過(guò)SpringAMQP消息重試機(jī)制是阻塞式的重試,也就是多次重試等待過(guò)程中,線程是被阻塞的,會(huì)影響業(yè)務(wù)性能。如果對(duì)業(yè)務(wù)性能有要求,建議禁用重試機(jī)制,如果要使用就合理配置等待時(shí)長(zhǎng)和重試次數(shù),也可以考慮使用異步線程來(lái)執(zhí)行發(fā)送消息的代碼。
rabbitmq:
template:
retry:
enabled: true # 開(kāi)啟超時(shí)重試機(jī)制
initial-interval: 1000ms # 失敗后的初始等待時(shí)間
multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = initial-interval * multiplier
max-attempts: 3 # 最大重試次數(shù)
生產(chǎn)者確認(rèn)
RabbitMQ有Publisher Confirm和Publisher Return兩種確認(rèn)機(jī)制。開(kāi)啟確認(rèn)機(jī)制后,在MQ成功發(fā)送消息后返回確認(rèn)消息給生產(chǎn)者。
- 消息投遞到了交換機(jī),但是路由失敗。此時(shí)會(huì)通過(guò)PublisherReturn返回路由異常的原因,然后返回ACK,告知投遞成功,此時(shí)消息成功發(fā)送到了交換機(jī)中,但是路由失敗的原因可能是交換機(jī)沒(méi)有關(guān)聯(lián)隊(duì)列或者交換機(jī)沒(méi)有BindingKey與隊(duì)列相匹配。
- 如果臨時(shí)消息(未開(kāi)啟持久化non durable)投遞到了MQ,并且入隊(duì)成功,返回ack,表示投遞成功。
- 如果持久消息(開(kāi)啟了持久化durable)投遞到了MQ,并且入隊(duì)完成持久化,返回ack,表示投遞成功。
- 其他情況都會(huì)返回nack,出現(xiàn)nack可能的情況有
- 如果消息投遞到交換機(jī)失敗,會(huì)通過(guò)Publisher Confirm返回nack,表示消息投遞失敗,這種情況一般很少發(fā)生,如果發(fā)生就要不是代碼寫(xiě)的有問(wèn)題,要不就是交換機(jī)的配置有問(wèn)題。
- 消息投遞到隊(duì)列時(shí)隊(duì)列已滿。
配置生產(chǎn)者確認(rèn)機(jī)制
rabbitmq:
publisher-confirm-type: correlated # 開(kāi)啟publisher-confirm機(jī)制,并設(shè)置confirm類型
# 這里有三種參數(shù),默認(rèn)none關(guān)閉,其次simple是同步阻塞等待MQ回執(zhí)消息,然后是correlated是MQ異步回調(diào)方式返回回執(zhí)消息。
publisher-returns: true # 開(kāi)啟publisher return機(jī)制
每一個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback,因此需要在項(xiàng)目啟動(dòng)過(guò)程中配置
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 設(shè)置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息發(fā)送失敗,應(yīng)答碼{},原因{},交換機(jī){},路由鍵{},消息{}",
replyCode, replyText,exchange, routingKey, message.toString());
});
}
}
生產(chǎn)者發(fā)送消息
@Test
void testConfirmCallback() throws InterruptedException {
// 創(chuàng)建cd
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
// 添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
log.error("消息回調(diào)失敗", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
log.debug("收到confirm callback回執(zhí)");
if (result.isAck()) {
// 消息發(fā)送成功
log.debug("消息發(fā)送成功,返回ack");
}else{
// 消息發(fā)送失敗
log.error("消息發(fā)送失敗,返回nack,原因{}", result.getReason());
}
}
});
rabbitTemplate.convertAndSend("shuqg.direct", "red","hello", cd);
Thread.sleep(2000);
}
生產(chǎn)者確認(rèn)需要額外的網(wǎng)絡(luò)和系統(tǒng)資源開(kāi)銷,盡量不要使用。如果一定要使用,無(wú)需開(kāi)啟Publisher-Return機(jī)制,因?yàn)橐话懵酚墒∈亲约簶I(yè)務(wù)的問(wèn)題。對(duì)于nack消息可以設(shè)置有限的重試次數(shù),依然失敗則記錄異常消息到日志中。
MQ持久化
RabbitMQ如何保證消息可靠性
- 首先通過(guò)配置可以讓交換機(jī)、隊(duì)列、以及發(fā)送的消息都持久化。這樣隊(duì)列中的消息會(huì)持久化到磁盤(pán),MQ重啟消息依然存在。
- RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后會(huì)稱為隊(duì)列的默認(rèn)模式。LazyQueue會(huì)將所有消息都持久化
- 開(kāi)啟持久化和生產(chǎn)者確認(rèn)時(shí),RabbitMQ只有在消息持久化完成后才會(huì)給生產(chǎn)者返回ACK回執(zhí)
在默認(rèn)情況下,RabbitMQ會(huì)將接收到的消息保存到內(nèi)存中以降低消息收發(fā)的延遲。這樣會(huì)導(dǎo)致兩個(gè)問(wèn)題:
- 一旦MQ宕機(jī),內(nèi)存中的消息會(huì)丟失。
- 內(nèi)存空間有限,當(dāng)消費(fèi)者故障或處理過(guò)慢,會(huì)導(dǎo)致消息積壓,引發(fā)MQ阻塞。
- 可以通過(guò)數(shù)據(jù)持久化(mq3.6以前)和Lazy Queue(mq3.6以后)去解決這兩個(gè)問(wèn)題。
數(shù)據(jù)持久化
- 交換機(jī)持久化:創(chuàng)建交換機(jī)的時(shí)候如果勾選Transient就是臨時(shí)的,但是我們平時(shí)都是勾選Durable是要保證交換機(jī)持久化的,mq重啟之后交換機(jī)不會(huì)消失。
- 隊(duì)列持久化:創(chuàng)建隊(duì)列的時(shí)候也是同理,默認(rèn)的是Druable持久化的,不然mq重啟之后隊(duì)列就會(huì)丟失。在Spring中創(chuàng)建交換機(jī)和隊(duì)列的時(shí)候默認(rèn)就是持久化的。
LazyQueue
從mq3.6之后開(kāi)始增加了LazyQueue的概念,也就是惰性隊(duì)列。惰性隊(duì)列有以下特點(diǎn):
- 接收到消息之后直接存入磁盤(pán)而非內(nèi)存(內(nèi)存中只保留最近的消息,默認(rèn)2048條)
- 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤(pán)中讀取并加載到內(nèi)存。
- 支持?jǐn)?shù)百萬(wàn)條的數(shù)據(jù)存儲(chǔ)。
- 在3.12版本之后,所有的隊(duì)列都是LazyQueue模式,無(wú)法更改。
消費(fèi)者的可靠性
1. 消費(fèi)者確認(rèn)機(jī)制
為了確認(rèn)消費(fèi)者是否成功處理消息,RabbitMQ提供了消費(fèi)者確認(rèn)機(jī)制(Consumer Acknowledgement),當(dāng)消費(fèi)者處理消息結(jié)束后,應(yīng)該向RabbitMQ發(fā)送一個(gè)回執(zhí),告知RabbitMQ自己消息處理狀態(tài)。
- ack:成功處理消息,RabbitMQ從隊(duì)列中刪除該消息。
- nack:消息處理失敗,RabbitMQ重新發(fā)送消息。
- reject:消息處理失敗并拒絕該消息,RabbitMQ從隊(duì)列中刪除該消息。
開(kāi)啟消費(fèi)者確認(rèn)機(jī)制為auto,由Spring確認(rèn)消息處理成功后返回ack。開(kāi)啟消費(fèi)者確認(rèn)機(jī)制,RabbitMQ支持消費(fèi)者確認(rèn)機(jī)制,當(dāng)消費(fèi)者處理消息之后可以向MQ發(fā)送ack回執(zhí),MQ收到ack回執(zhí)之后才會(huì)去刪除該消息。 SpringAMQP中允許配置三種確認(rèn)模式:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: none # none, manual手動(dòng), auto自動(dòng)
- none:默認(rèn)情況,不處理,即消息投遞給消費(fèi)者后立刻ack,消息會(huì)立刻從MQ刪除。非常不安全,不建議使用。
- manual:手動(dòng)模式,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack或reject,存在業(yè)務(wù)入侵,但是更靈活。
- auto(一般選擇這種):自動(dòng)模式,由Spring監(jiān)聽(tīng)listener代碼是否出現(xiàn)異常,當(dāng)業(yè)務(wù)正常執(zhí)行時(shí)則自動(dòng)返回ack.當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),根據(jù)異常判斷返回不同結(jié)果:
- 如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回nack。
- 如果是消息處理或校驗(yàn)異常,自動(dòng)返回reject。
- 當(dāng)消費(fèi)者異常返回時(shí),我們可以開(kāi)啟消費(fèi)者失敗重試機(jī)制,利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,設(shè)置重試次數(shù),多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理。
2. 消費(fèi)失敗問(wèn)題
當(dāng)消費(fèi)者出現(xiàn)異常后,會(huì)不斷requeue(重新入隊(duì)到隊(duì)列),再重新發(fā)送給消費(fèi)者,然后再次異常,再次requeue,無(wú)限循環(huán),導(dǎo)致mq消息處理飆升,帶來(lái)不必要的壓力。 我們可以利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無(wú)限制的requeue到mq隊(duì)列。
- 消息失敗后處理策略:在開(kāi)始重試模式后,默認(rèn)情況下報(bào)錯(cuò)三次,也就是重試三次就會(huì)放棄,此時(shí)需要使用MessageRecoverer接口來(lái)處理,包含三種實(shí)現(xiàn)
- RejectAndDontRequeueRecoverer:重試耗盡后,直接丟棄消息。默認(rèn)的就是這種方式。
- ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)。
- RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定交換機(jī)。
- 首先將失敗處理策略改為第三種RepublishMessageRecoverer。
- 然后定義接收失敗消息的交換機(jī)、隊(duì)列及其綁定關(guān)系。
- 然后定義RepublishMessageRecoverer。
3. 業(yè)務(wù)冪等性
冪等性是一個(gè)數(shù)學(xué)概念,就是f(x) = f(f(x)),在程序開(kāi)發(fā)中,指的是同一個(gè)業(yè)務(wù),執(zhí)行一次或多次對(duì)業(yè)務(wù)狀態(tài)的影響是一致的。重復(fù)消費(fèi)問(wèn)題。
-
查詢刪除這些業(yè)務(wù)天生就是冪等的,新增修改這些業(yè)務(wù)就不是冪等的。
-
給每一個(gè)消息都設(shè)置一個(gè)唯一id,利用id判斷是否重復(fù)消費(fèi)
-
每一個(gè)消息都生成一個(gè)唯一id,與消息一起投遞給消費(fèi)者。
-
消費(fèi)者接收到消息后處理自己的業(yè)務(wù),業(yè)務(wù)處理成功后將消息id保存到數(shù)據(jù)庫(kù)中。
-
如果下次又收到相同的消息,去數(shù)據(jù)庫(kù)查詢判斷是否存在,存在則為重復(fù)消息,放棄處理。
-
使用自帶的Jackson2JsonMessageConverter,可以實(shí)現(xiàn)自動(dòng)生成唯一id,當(dāng)將CreateMessageIds設(shè)置為true,底層會(huì)自動(dòng)創(chuàng)建唯一id,并返回。
-
@Bean public MessageConverter jacksonMessageConverter(){ // 定義消息轉(zhuǎn)換器 Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter(); // 配置自動(dòng)創(chuàng)建消息id,用于識(shí)別不同消息,也可以在業(yè)務(wù)中基于id判斷是否重復(fù)消息 jjmc.setCreateMessageIds(true); return jjmc; }
-
-
業(yè)務(wù)判斷
- 結(jié)合業(yè)務(wù)邏輯,基于業(yè)務(wù)本身做判斷。以支付修改訂單業(yè)務(wù)為例,我們要在支付后修改訂單狀態(tài)為已支付,應(yīng)該在修改訂單狀態(tài)前先查詢訂單狀態(tài),判斷狀態(tài)是否未支付。只有未支付訂單才需要修改,其他狀態(tài)不做處理。
如何保證支付服務(wù)與交易服務(wù)之間的訂單狀態(tài)一致性
使用MQ完成訂單狀態(tài)同步->為了保證mq可靠,使用了生產(chǎn)者確認(rèn),消費(fèi)者確認(rèn),生產(chǎn)者重試,同時(shí)開(kāi)啟mq持久化,最后做了冪等性判斷。
如何保證消息不丟失
-
可能導(dǎo)致消息丟失的場(chǎng)景:生產(chǎn)者發(fā)送消息沒(méi)有到達(dá)交換機(jī)或者沒(méi)有到達(dá)隊(duì)列,MQ宕機(jī),消費(fèi)者服務(wù)宕機(jī)。
-
開(kāi)啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列。RabbitMQ中提供了一個(gè)確認(rèn)機(jī)制用來(lái)避免消息發(fā)送到MQ過(guò)程中丟失,消息發(fā)送到MQ之后,會(huì)返回一個(gè)結(jié)果給發(fā)送者,表示消息是否處理成功。
- 如果消息發(fā)送到交換機(jī)失敗,交換機(jī)會(huì)返回一個(gè)nack,如果是發(fā)送到MQ失敗會(huì)返回一個(gè)ack。
- 消息失敗之后,回調(diào)方法重新發(fā)送消息,如果還是失敗,可以記錄到日志中通過(guò)查看日志進(jìn)行補(bǔ)充,或者將失敗的消息記錄到數(shù)據(jù)庫(kù)中,做一個(gè)定時(shí)發(fā)送任務(wù),發(fā)送成功之后刪除數(shù)據(jù)庫(kù)中的數(shù)據(jù)。
-
開(kāi)啟消息持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會(huì)丟失。MQ默認(rèn)是在內(nèi)存中存儲(chǔ)消息,開(kāi)啟持久化功能可以將數(shù)據(jù)存儲(chǔ)在磁盤(pán)上,即使MQ宕機(jī)或重啟也不會(huì)丟失數(shù)據(jù)。
- 持久化交換機(jī)
- 持久化隊(duì)列
- 持久化消息
-
開(kāi)啟消費(fèi)者確認(rèn)機(jī)制為auto,由Spring確認(rèn)消息處理成功后返回ack。
-
開(kāi)啟消費(fèi)者確認(rèn)機(jī)制,RabbitMQ支持消費(fèi)者確認(rèn)機(jī)制,當(dāng)消費(fèi)者處理消息之后可以向MQ發(fā)送ack回執(zhí),MQ收到ack回執(zhí)之后才會(huì)去刪除該消息。SpringAMQP中允許配置三種確認(rèn)模式:
rabbitmq: listener: simple: prefetch: 1 acknowledge-mode: none # none, manual手動(dòng), auto自動(dòng)
- none:默認(rèn)情況,不處理,即消息投遞給消費(fèi)者后立刻ack,消息會(huì)立刻從MQ刪除。非常不安全,不建議使用。
- manual:手動(dòng)模式,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack或reject,存在業(yè)務(wù)入侵,但是更靈活。
- auto(一般選擇這種):自動(dòng)模式,由Spring監(jiān)聽(tīng)listener代碼是否出現(xiàn)異常,當(dāng)業(yè)務(wù)正常執(zhí)行時(shí)則自動(dòng)返回ack.當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),根據(jù)異常判斷返回不同結(jié)果:
- 如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回nack。
- 如果是消息處理或校驗(yàn)異常,自動(dòng)返回reject。
- 當(dāng)消費(fèi)者異常返回時(shí),我們可以開(kāi)啟消費(fèi)者失敗重試機(jī)制,利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,設(shè)置重試次數(shù),多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理。
消息重復(fù)消費(fèi)問(wèn)題
- 重復(fù)消費(fèi)發(fā)生的地方:在消費(fèi)者消費(fèi)隊(duì)列中的消息的時(shí)候會(huì)向隊(duì)列中返回ack,此時(shí)如果因?yàn)榫W(wǎng)絡(luò)問(wèn)題或者隊(duì)列宕機(jī),沒(méi)有收到消費(fèi)者的ack,重連之后會(huì)重試機(jī)制導(dǎo)致重復(fù)消費(fèi)問(wèn)題。
- 解決方法:每條消息設(shè)置一個(gè)唯一的標(biāo)識(shí)id,當(dāng)消費(fèi)者接收到消息時(shí)去校驗(yàn)這個(gè)業(yè)務(wù)id是否存在,根據(jù)這個(gè)id去表中查詢,如果id不存在則正常去接收消息,如果id已經(jīng)存在了就證明這個(gè)消息已經(jīng)消費(fèi)過(guò)了,就不需要去消費(fèi)了,這樣就解決了重復(fù)消費(fèi)的問(wèn)題。
- 冪等方案:分布式鎖,數(shù)據(jù)庫(kù)鎖(悲觀鎖、樂(lè)觀鎖),但是加鎖的化性能會(huì)大大降低,如果數(shù)據(jù)庫(kù)中有唯一標(biāo)識(shí)id,則優(yōu)先采用第一種方案。
RabbitMQ中死信交換機(jī)?延遲隊(duì)列了解哪些?
-
一般使用在下單的時(shí)候,當(dāng)下單之后當(dāng)下單之后會(huì)有一個(gè)過(guò)期的時(shí)間,當(dāng)在指定時(shí)間內(nèi)未支付,就會(huì)將這個(gè)訂單銷毀。如果使用定時(shí)任務(wù),設(shè)置key value在redis中設(shè)置過(guò)期時(shí)間,我們需要定時(shí)去查詢數(shù)據(jù)庫(kù)中用戶支付狀態(tài),如果到達(dá)過(guò)期時(shí)間還沒(méi)有支付,就會(huì)刪除訂單表,這個(gè)時(shí)候,如果設(shè)置時(shí)間間隔較短,對(duì)數(shù)據(jù)庫(kù)的壓力會(huì)非常巨大,但是如果設(shè)置間隔時(shí)間較長(zhǎng),就會(huì)導(dǎo)致時(shí)效性較差。
-
延遲隊(duì)列就是進(jìn)入隊(duì)列的消息會(huì)被延遲消費(fèi)的隊(duì)列,我們當(dāng)時(shí)的某一個(gè)業(yè)務(wù)使用到了延遲隊(duì)列(超時(shí)訂單、限時(shí)優(yōu)惠、定時(shí)發(fā)布。。)
-
其中延遲隊(duì)列就用到了死信交換機(jī)和TTL實(shí)現(xiàn)的。
-
當(dāng)隊(duì)列中的消息滿足下面情況之一,就可以成為死信
- 消息消費(fèi)失敗,返回nack,并且請(qǐng)求參數(shù)為false。
- 消息超時(shí)未消費(fèi)(設(shè)置TTL)。設(shè)置TTL一般有兩種方式(哪個(gè)存活時(shí)間短以哪個(gè)為準(zhǔn))
- 消息所在隊(duì)列設(shè)置了存活時(shí)間。
- 消息本身設(shè)置了存活時(shí)間。
- 要傳遞的隊(duì)列消息堆積滿了,最早的消息可能成為死信。
-
一般死信消息是會(huì)被直接丟棄的,但是我們可以給該隊(duì)列配置一個(gè)dead-letter-exchange屬性,指定一個(gè)交換機(jī),隊(duì)列中的死信就會(huì)投遞到該交換機(jī)中,這個(gè)交換機(jī)就是死信交換機(jī)。這個(gè)交換機(jī)也可以綁定一個(gè)隊(duì)列,死信消息可以直接從交換機(jī)投遞到該隊(duì)列中,其他消費(fèi)者可以去消費(fèi)該隊(duì)列中的消息。
-
RabbitMQ中有一個(gè)延遲隊(duì)列插件實(shí)現(xiàn)延遲隊(duì)列DelayExchange文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-832794.html
- 聲明一個(gè)交換機(jī),添加delayed屬性為true,這個(gè)就是一個(gè)可以實(shí)現(xiàn)延遲隊(duì)列的交換機(jī)。
- 發(fā)送消息時(shí),通過(guò)消息頭x-delay,設(shè)置消息存活時(shí)間。
消息堆積問(wèn)題怎么解決
產(chǎn)生消息堆積的情況,當(dāng)生產(chǎn)者發(fā)送消息的速度超過(guò)了消費(fèi)者處理消息的速度,就會(huì)導(dǎo)致隊(duì)列中的消息堆積,直到隊(duì)列存儲(chǔ)消息達(dá)到上限。之后發(fā)送的消息就會(huì)成為死信??赡軙?huì)被丟棄,這就是消息堆積。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-832794.html
- 增加更多消費(fèi)者,提高消費(fèi)速度。
- 在消費(fèi)者內(nèi)開(kāi)啟線程池加快消息處理速度。消費(fèi)者只負(fù)責(zé)去接收消息,所有的處理消息,處理業(yè)務(wù)邏輯都交給線程池去處理,但是線程池的作用是最大程度的利用CPU的資源,需要根據(jù)硬件配置去設(shè)置線程池。
- 擴(kuò)大隊(duì)列容積,提高堆積上限,采用惰性隊(duì)列,在聲明隊(duì)列的時(shí)候可以設(shè)置屬性x-queue-mode為lazy,即為惰性隊(duì)列。使用惰性隊(duì)列的好處是:
- 接收到消息后直接存入磁盤(pán)而非內(nèi)存,消息的上限比較高。
- 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤(pán)中讀取并加載到內(nèi)存。
- 支持?jǐn)?shù)百萬(wàn)條消息存儲(chǔ)。
- 性能比較穩(wěn)定,但基于磁盤(pán)存儲(chǔ),受限于磁盤(pán)IO,時(shí)效性會(huì)降低。
RabbitMQ高可用機(jī)制
- 普通集群,又叫標(biāo)準(zhǔn)集群,這個(gè)集群中每一個(gè)節(jié)點(diǎn)都有同一個(gè)交換機(jī)的信息,每個(gè)節(jié)點(diǎn)都有不同的隊(duì)列,但是其他節(jié)點(diǎn)會(huì)有隊(duì)列的引用信息。
- 會(huì)在集群的各個(gè)節(jié)點(diǎn)間共享部分?jǐn)?shù)據(jù),包含交換機(jī)、隊(duì)列元信息。但是不包含隊(duì)列中的消息。
- 當(dāng)訪問(wèn)集群中某節(jié)點(diǎn)時(shí),如果隊(duì)列不在該節(jié)點(diǎn),會(huì)從數(shù)據(jù)所在節(jié)點(diǎn)傳遞到當(dāng)前節(jié)點(diǎn)并返回。
- 隊(duì)列所在節(jié)點(diǎn)宕機(jī),隊(duì)列中的消息就會(huì)丟失。
- 鏡像集群,本質(zhì)是主從模式
- 交換機(jī)、隊(duì)列、隊(duì)列中的消息會(huì)在各個(gè)mq鏡像節(jié)點(diǎn)之間同步備份。
- 創(chuàng)建隊(duì)列的節(jié)點(diǎn)是該隊(duì)列的主節(jié)點(diǎn),備份到其他節(jié)點(diǎn)的該隊(duì)列是該隊(duì)列的鏡像節(jié)點(diǎn)。
- 鏡像隊(duì)列結(jié)構(gòu)是一主多從(從就是鏡像),所有操作都是主節(jié)點(diǎn)完成,然后同步給鏡像節(jié)點(diǎn)。
- 主節(jié)點(diǎn)宕機(jī)后,鏡像節(jié)點(diǎn)會(huì)替代成為新的主節(jié)點(diǎn)(如果在主從同步前主節(jié)點(diǎn)就已經(jīng)宕機(jī),可能會(huì)出現(xiàn)數(shù)據(jù)丟失)
- 如果擔(dān)心出現(xiàn)數(shù)據(jù)丟失我們可以采用仲裁隊(duì)列替代鏡像隊(duì)列,與鏡像隊(duì)列一樣,都是主從模式,支持主從數(shù)據(jù)同步,主從協(xié)議基于Raft協(xié)議,是強(qiáng)一致性的,并且使用起來(lái)也非常簡(jiǎn)單,不需要額外的配置,在聲明隊(duì)列的時(shí)候只需要指定這個(gè)是仲裁隊(duì)列即可。
到了這里,關(guān)于RabbitMQ入門(mén)指南的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!