1. 消息可靠性
每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括:
- 發(fā)送時(shí)丟失:
-
- 生產(chǎn)者發(fā)送的消息未送達(dá)exchange
- 消息到達(dá)exchange后未到達(dá)queue
- MQ宕機(jī),queue將消息丟失
- consumer接收到消息后未消費(fèi)就宕機(jī)
RabbitMQ分別給出了解決方案:
- 生產(chǎn)者發(fā)送確認(rèn)機(jī)制
- mq持久化
- 消費(fèi)者消費(fèi)確認(rèn)機(jī)制
- 失敗重試機(jī)制
1.1. 生產(chǎn)者消息確認(rèn)
RabbitMQ提供了publisher confirm機(jī)制來避免消息發(fā)送到MQ過程中丟失。
這種機(jī)制必須給每個(gè)消息指定一個(gè)唯一ID。消息發(fā)送到MQ以后,會(huì)返回一個(gè)結(jié)果給發(fā)送者,表示消息是否處理成功。
返回結(jié)果有兩種方式:
- publisher-confirm,發(fā)送者確認(rèn)
-
- 消息成功投遞到交換機(jī),返回ack
- 消息未投遞到交換機(jī),返回nack
- publisher-return,發(fā)送者回執(zhí)
-
- 消息投遞到交換機(jī)了,但是沒有路由到隊(duì)列。返回ACK,及路由失敗原因。
注意:確認(rèn)機(jī)制在發(fā)送消息時(shí),需要給每個(gè)消息設(shè)置一個(gè)全局唯一的id,以區(qū)分不同的消息,避免ack沖突
1.1.1. 修改配置
修改publisher服務(wù)中的application.yml文件,添加下面的內(nèi)容:
spring:
rabbitmq:
# 啟用發(fā)送者確認(rèn),生產(chǎn)者到交換機(jī)
publisher-confirm-type: correlated
# 啟用發(fā)送者確認(rèn),從交換機(jī)到隊(duì)列
publisher-returns: true
# 啟用失敗的回調(diào)
template:
mandatory: true
說明:
- publish-confirm-type:開啟publisher-confirm,這里支持兩種類型:
-
- simple:同步等待confirm結(jié)果,直到超時(shí)
- correlated:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback
- publish-returns:開啟publish-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallback
- template.mandatory:定義消息路由失敗時(shí)的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
1.1.2. 定義ConfirmCallback
對(duì)發(fā)送者把消息發(fā)送到交換機(jī)進(jìn)行確認(rèn)
在發(fā)送消息時(shí)指定
@Test
public void test() throws InterruptedException {
String exchanges = "itcast.direct";
String routingKey = "phone";
String message = "hello, spring amqp!";
CorrelationData correlation = new CorrelationData(UUID.randomUUID().toString());
//發(fā)送的通知回調(diào)
correlation.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
log.error("消息發(fā)送異常",ex);
}
//生產(chǎn)者正常把消息發(fā)出來了
@Override
public void onSuccess(CorrelationData.Confirm result) {
boolean ack = result.isAck();
if (ack){
log.info("交換正常收到消息");
}else {
log.info("交換機(jī)沒有收到消息");
}
}
});
rabbitTemplate.convertAndSend(exchanges, routingKey, message,correlation);
}
1.1.3. 定義Return回調(diào)
從交換機(jī)到隊(duì)列的確認(rèn)
每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback,因此需要在項(xiàng)目加載時(shí)配置:
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
//發(fā)送到隊(duì)列失敗
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("交換機(jī)路由到隊(duì)列失敗,message:{} replyCode:{} replyText:{} exchange:{} routingKey:{}",
message, replyCode, replyText, exchange, routingKey);
}
});
}
1.2. 消息持久化
生產(chǎn)者確認(rèn)可以確保消息投遞到RabbitMQ的隊(duì)列中,但是消息發(fā)送到RabbitMQ以后,如果突然宕機(jī),也可能導(dǎo)致消息丟失。
要想確保消息在RabbitMQ中安全保存,必須開啟消息持久化機(jī)制。
- 交換機(jī)持久化
- 隊(duì)列持久化
- 消息持久化
1.2.1. 交換機(jī)持久化
RabbitMQ中交換機(jī)默認(rèn)是持久化的。
SpringAMQP中可以通過代碼指定交換機(jī)持久化:
@Bean
public DirectExchange simpleExchange(){
// 三個(gè)參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒有queue與其綁定時(shí)是否自動(dòng)刪除
return new DirectExchange("simple.direct", true, false);
}
默認(rèn)情況下,由SpringAMQP聲明的交換機(jī)都是持久化的。
可以在RabbitMQ控制臺(tái)看到持久化的交換機(jī)都會(huì)帶上D的標(biāo)示:
1.2.2. 隊(duì)列持久化
RabbitMQ中隊(duì)列默認(rèn)是持久化的。
SpringAMQP中可以通過代碼指定交換機(jī)持久化:
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder構(gòu)建隊(duì)列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
默認(rèn)情況下,由SpringAMQP聲明的隊(duì)列都是持久化的。
1.2.3. 消息持久化
利用SpringAMQP發(fā)送消息時(shí),可以設(shè)置消息的屬性(MessageProperties),指定delivery-mode:
- 1:非持久化
- 2:持久化
默認(rèn)情況下,SpringAMQP發(fā)出的任何消息都是持久化的,不用特意指定。
1.3. 消費(fèi)者消息確認(rèn)
RabbitMQ是閱后即焚機(jī)制,RabbitMQ確認(rèn)消息被消費(fèi)者消費(fèi)后會(huì)立刻刪除。
RabbitMQ是通過消費(fèi)者回執(zhí)來確認(rèn)消費(fèi)者是否成功處理消息的:消費(fèi)者獲取消息后,應(yīng)該向RabbitMQ發(fā)送ACK回執(zhí),表明自己已經(jīng)處理消息。
回執(zhí)分為3種類型:ack、nack、reject。如果消費(fèi)者返回ack,MQ會(huì)把消息從隊(duì)列刪除;如果返回nack或者reject,如果requeue是true的話,MQ會(huì)把消息重新投遞給消費(fèi)者,如果requeue是false的話,MQ會(huì)把消息刪掉。
1.3.1. 場(chǎng)景:
- 1)RabbitMQ投遞消息給消費(fèi)者
- 2)消費(fèi)者獲取消息后,返回ACK給RabbitMQ
- 3)RabbitMQ刪除消息
- 4)消費(fèi)者宕機(jī),消息尚未處理
這樣,消息就丟失了。因此消費(fèi)者返回ACK的時(shí)機(jī)非常重要。
SpringAMQP則允許配置三種確認(rèn)模式:
- manual:手動(dòng)ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。消息投遞是不可靠的,可能丟失。自己根據(jù)業(yè)務(wù)情況,判斷什么時(shí)候該ack。(不推薦)
- auto:自動(dòng)ack,由spring監(jiān)測(cè)listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack,并且requeue是true。類似事務(wù)機(jī)制,出現(xiàn)異常時(shí)返回nack,消息回滾到mq;沒有異常,返回ack。
- none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會(huì)成功處理,因此消息投遞后立即被刪除(不推薦)
1.3.2. 演示none模式
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 關(guān)閉ack
修改consumer服務(wù)的SpringRabbitListener類中的方法,模擬一個(gè)消息處理異常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
log.info("消費(fèi)者接收到simple.queue的消息:【{}】", msg);
// 模擬異常
System.out.println(1 / 0);
log.debug("消息處理完成!");
}
測(cè)試可以發(fā)現(xiàn),當(dāng)消息處理拋異常時(shí),消息依然被RabbitMQ刪除了。
1.3.3. 演示auto模式
把確認(rèn)機(jī)制修改為auto:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 關(guān)閉ack
在異常位置打斷點(diǎn),再次發(fā)送消息,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unack(未確定狀態(tài)):
拋出異常后,因?yàn)镾pring會(huì)自動(dòng)返回nack,所以消息恢復(fù)至Ready狀態(tài),并且沒有被RabbitMQ刪除:
1.4. 消費(fèi)失敗重試機(jī)制
當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者,然后再次異常,再次requeue,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力:
1.4.1. 本地重試
可以利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無限制的requeue到mq隊(duì)列。
修改consumer服務(wù)的application.yml文件,添加內(nèi)容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開啟消費(fèi)者失敗重試
initial-interval: 1000 # 初識(shí)的失敗等待時(shí)長為1秒
multiplier: 3 # 失敗的等待時(shí)長倍數(shù),下次等待時(shí)長 = multiplier * last-interval
max-attempts: 3 # 最大重試次數(shù)
stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
第一次 1秒
第二次 3*1000 = 3秒
第三次 3*3000 = 9秒
第四次 3*9000 = 27秒
重啟consumer服務(wù),重復(fù)之前的測(cè)試??梢园l(fā)現(xiàn):
- 在重試3次后,SpringAMQP會(huì)拋出異常AmqpRejectAndDontRequeueException,說明本地重試觸發(fā)了
- 查看RabbitMQ控制臺(tái),發(fā)現(xiàn)消息被刪除了,說明最后SpringAMQP返回的是reject,并且不重新入隊(duì),mq刪除消息了
結(jié)論:
- 開啟本地重試時(shí),消息處理過程中拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試
- 重試達(dá)到最大次數(shù)后,Spring會(huì)返回reject,消息會(huì)被丟棄
1.4.2. 失敗策略
在之前的測(cè)試中,達(dá)到最大重試次數(shù)后,消息會(huì)被丟棄,這是由Spring內(nèi)部機(jī)制決定的。
在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecovery接口來處理,它包含三種不同的實(shí)現(xiàn):
- RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
- ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)
- RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)
比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer
失敗后將消息投遞到一個(gè)指定的,專門存放異常消息的隊(duì)列,后續(xù)由人工集中處理。
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
// 定義一個(gè)RepublishMessageRecoverer,關(guān)聯(lián)隊(duì)列和交換機(jī)
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
1.5. 總結(jié)
如何確保RabbitMQ消息的可靠性?
- 開啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列
- 開啟持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會(huì)丟失
- 開啟消費(fèi)者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后完成ack
- 開啟消費(fèi)者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理
2. 死信交換機(jī)
2.1. 什么是死信交換機(jī)
當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí),可以成為死信(dead letter):
- 消費(fèi)者使用basic.reject或 basic.nack聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false
- 消息是一個(gè)過期消息,超時(shí)無人消費(fèi)
- 要投遞的隊(duì)列消息滿了,無法投遞
如果這個(gè)包含死信的隊(duì)列配置了dead-letter-exchange屬性,指定了一個(gè)交換機(jī),那么隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,而這個(gè)交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,檢查DLX)。
一個(gè)消息被消費(fèi)者拒絕了,變成了死信。因?yàn)閟imple.queue綁定了死信交換機(jī) dl.direct,因此死信會(huì)投遞給這個(gè)交換機(jī)。如果這個(gè)死信交換機(jī)也綁定了一個(gè)隊(duì)列,則消息最終會(huì)進(jìn)入這個(gè)存放死信的隊(duì)列。
隊(duì)列將死信投遞給死信交換機(jī)時(shí),必須知道兩個(gè)信息:
- 死信交換機(jī)名稱
- 死信交換機(jī)與死信隊(duì)列綁定的RoutingKey
2.2. TTL
一個(gè)隊(duì)列中的消息如果超時(shí)未消費(fèi),則會(huì)變?yōu)樗佬?,超時(shí)分為兩種情況:
- 消息所在的隊(duì)列設(shè)置了超時(shí)時(shí)間
- 消息本身設(shè)置了超時(shí)時(shí)間
2.2.1. 接收超時(shí)死信的死信交換機(jī)
在consumer服務(wù)的SpringRabbitListener中,定義一個(gè)新的消費(fèi)者,并且聲明 死信交換機(jī)、死信隊(duì)列:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.ttl.queue", durable = "true"),
exchange = @Exchange(name = "dl.ttl.exchange"),
key = "dl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.ttl.queue的延遲消息:{}", msg);
}
2.2.2. 聲明一個(gè)隊(duì)列,并且指定TTL
要給隊(duì)列設(shè)置超時(shí)時(shí)間,需要在聲明隊(duì)列時(shí)配置x-message-ttl屬性:
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue") // 指定隊(duì)列名稱,并持久化
.ttl(10000) // 設(shè)置隊(duì)列的超時(shí)時(shí)間,10秒
.deadLetterExchange("dl.ttl.exchange") // 指定死信交換機(jī)
.deadLetterRoutingKey("dl") // 指定死信交換機(jī)綁定key
.build();
}
這個(gè)隊(duì)列設(shè)定了死信交換機(jī)為dl.ttl.exchange
聲明交換機(jī),將ttl與交換機(jī)綁定:
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
發(fā)送消息,但是不要指定TTL:
@Test
public void testTTLQueue() {
// 創(chuàng)建消息
String message = "hello, ttl queue";
// 消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 發(fā)送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
// 記錄日志
log.debug("發(fā)送消息成功");
}
發(fā)送消息的日志:
查看下接收消息的日志:
2.2.3. 發(fā)送消息時(shí),設(shè)定TTL
@Test
public void testTTLMsg() {
// 創(chuàng)建消息
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
// 消息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 發(fā)送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
log.debug("發(fā)送消息成功");
}
這次,發(fā)送與接收的延遲只有5秒。說明當(dāng)隊(duì)列、消息都設(shè)置了TTL時(shí),任意一個(gè)到期就會(huì)成為死信。
2.2.4. 總結(jié)
消息超時(shí)的兩種方式是?
- 給隊(duì)列設(shè)置ttl屬性,進(jìn)入隊(duì)列后超過ttl時(shí)間的消息變?yōu)樗佬?/li>
- 給消息設(shè)置ttl屬性,隊(duì)列接收到消息超過ttl時(shí)間后變?yōu)樗佬?/li>
如何實(shí)現(xiàn)發(fā)送一個(gè)消息20秒后消費(fèi)者才收到消息?
- 給消息的目標(biāo)隊(duì)列指定死信交換機(jī)
- 將消費(fèi)者監(jiān)聽的隊(duì)列綁定到死信交換機(jī)
- 發(fā)送消息時(shí)給消息設(shè)置超時(shí)時(shí)間為20秒
死信交換機(jī)和ttl實(shí)現(xiàn)消息延遲發(fā)送的缺點(diǎn)?
- 存在隊(duì)頭阻塞問題
2.3. 延遲交換機(jī)
利用TTL結(jié)合死信交換機(jī),實(shí)現(xiàn)了消費(fèi)者延遲收到消息的效果。這種消息模式就稱為延遲隊(duì)列(Delay Queue)模式。
但是死信交換機(jī)存在隊(duì)頭阻塞問題,因此并不推薦使用。
因?yàn)檠舆t隊(duì)列的需求非常多,所以RabbitMQ的官方也推出了 DelayExchange 插件,原生支持延遲隊(duì)列效果。
2.3.1. 安裝DelayExchange插件
上傳插件
查看數(shù)據(jù)卷:
docker volume inspect mq-plugins
將插件上傳到這個(gè)目錄即可
安裝插件
進(jìn)入MQ容器內(nèi)部來執(zhí)行安裝。我的容器名為mq,所以執(zhí)行下面命令:
docker exec -it mq bash
進(jìn)入容器內(nèi)部后,執(zhí)行下面命令開啟插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2.3.2. DelayExchange原理
2.3.2.1. 聲明DelayExchange交換機(jī)
插件的使用也非常簡單:聲明一個(gè)交換機(jī),交換機(jī)的類型可以是任意類型,只需要設(shè)定delayed屬性為true即可,然后聲明隊(duì)列與其綁定即可。
基于注解方式(推薦):
也可以基于@Bean的方式:
2.3.2.2. 發(fā)送消息
發(fā)送消息時(shí),一定要攜帶x-delay屬性,指定延遲的時(shí)間:
2.3.3. 總結(jié)
延遲隊(duì)列插件的使用步驟包括哪些?
- 聲明一個(gè)交換機(jī),添加delayed屬性為true
- 發(fā)送消息時(shí),添加x-delay頭,值為超時(shí)時(shí)間
3. 惰性隊(duì)列
3.1. 消息堆積問題
當(dāng)生產(chǎn)者發(fā)送消息的速度超過了消費(fèi)者處理消息的速度,就會(huì)導(dǎo)致隊(duì)列中的消息堆積,直到隊(duì)列存儲(chǔ)消息達(dá)到上限。之后發(fā)送的消息就會(huì)成為死信,可能會(huì)被丟棄,這就是消息堆積問題。
解決消息堆積有兩種思路:
- 增加更多消費(fèi)者,提高消費(fèi)速度。也就是我們之前說的work queue模式
- 擴(kuò)大隊(duì)列容積,提高堆積上限
3.2. 惰性隊(duì)列
從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性隊(duì)列。惰性隊(duì)列的特征如下:
- 接收到消息后直接存入磁盤而非內(nèi)存
- 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤中讀取并加載到內(nèi)存
- 支持?jǐn)?shù)百萬條的消息存儲(chǔ)
3.2.1. 基于命令行設(shè)置lazy-queue
而要設(shè)置一個(gè)隊(duì)列為惰性隊(duì)列,只需要在聲明隊(duì)列時(shí),指定x-queue-mode屬性為lazy即可。
可以通過命令行將一個(gè)運(yùn)行中的隊(duì)列修改為惰性隊(duì)列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解讀:
- rabbitmqctl :RabbitMQ的命令行工具
- set_policy :添加一個(gè)策略
- Lazy :策略名稱,可以自定義
- "^lazy-queue$" :用正則表達(dá)式匹配隊(duì)列的名字
- '{"queue-mode":"lazy"}' :設(shè)置隊(duì)列模式為lazy模式
- --apply-to queues:策略的作用對(duì)象,是所有的隊(duì)列
3.2.2. 基于@Bean聲明lazy-queue
3.2.3. Queue的管理控制臺(tái)
ready:已發(fā)送到隊(duì)列,但還未發(fā)送給消費(fèi)者的消息,也就是僅執(zhí)行了publish的消息。
unacked:已發(fā)送給消費(fèi)者但還未收到消費(fèi)者ack的信息,即執(zhí)行了publish和delivery的消息。
total:ready與unacked消息的總和。
in memory:表示在內(nèi)存中進(jìn)行緩存的消息數(shù)。
persistent:表示在磁盤上存儲(chǔ)的持久化的消息數(shù)。
transient, paged out:表示非持久化的消息,但實(shí)際存儲(chǔ)到磁盤上的消息數(shù)(可能因?yàn)閮?nèi)存達(dá)到一定水位觸發(fā)的置換,也可能是隊(duì)列為lazy模式,即便是非持久化的消息也存儲(chǔ)在磁盤上了)
3.2.4. 總結(jié)
消息堆積問題的解決方案?
- 隊(duì)列上綁定多個(gè)消費(fèi)者,提高消費(fèi)速度
- 使用惰性隊(duì)列,可以再mq中保存更多消息
惰性隊(duì)列的優(yōu)點(diǎn)有哪些?文章來源:http://www.zghlxwxcb.cn/news/detail-786769.html
- 基于磁盤存儲(chǔ),消息上限高
- 沒有間歇性的page-out,性能比較穩(wěn)定
惰性隊(duì)列的缺點(diǎn)有哪些?文章來源地址http://www.zghlxwxcb.cn/news/detail-786769.html
- 基于磁盤存儲(chǔ),消息時(shí)效性會(huì)降低
- 性能受限于磁盤的IO
到了這里,關(guān)于服務(wù)異步通信-高級(jí)篇(RabbitMQ)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!