目錄
一、消息丟失
1、生產(chǎn)者重連
2、生產(chǎn)者確認(rèn)
3、數(shù)據(jù)持久化
4、惰性隊(duì)列
5、消費(fèi)失敗處理
二、消息重復(fù)
1、通過(guò)業(yè)務(wù)保證冪等性(優(yōu)先)
2、通過(guò)消息狀態(tài)去重保證冪等性
三、消息堆積
1、優(yōu)化消費(fèi)者處理邏輯
2、增加隊(duì)列及消費(fèi)者數(shù)量
3、使用惰性隊(duì)列
四、保證消息順序消費(fèi)
一、消息丟失
1、生產(chǎn)者重連
由于網(wǎng)絡(luò)波動(dòng),可能會(huì)出現(xiàn)客戶端連接失敗的情況,需開(kāi)啟重連機(jī)制。
SpringBoot項(xiàng)目配置:
spring:
rabbitmq:
# 連接超時(shí)時(shí)間
connection-timeout: 500ms
template:
retry:
# 開(kāi)啟失敗重連
enabled: true
# 失敗后重連初始間隔時(shí)間
initial-interval: 1000ms
# 失敗后下次間隔的時(shí)長(zhǎng)倍數(shù),下次間隔時(shí)長(zhǎng)=本次間隔時(shí)長(zhǎng)*multiplier
multiplier: 1
# 最大重試次數(shù)
max-attempts: 3
注意:重試過(guò)程線程是被阻塞的,合理配置等待時(shí)長(zhǎng)及最大重試次數(shù),或開(kāi)啟異步線程執(zhí)行,以免影響業(yè)務(wù)性能。
2、生產(chǎn)者確認(rèn)
SpringBoot項(xiàng)目配置:
spring:
rabbitmq:
# 開(kāi)啟生產(chǎn)者確認(rèn)
publisher-confirm-type: correlated
# 返回路由失敗消息,一般是開(kāi)發(fā)問(wèn)題,無(wú)需開(kāi)啟
publisher-returns: true
?publisher-confirm-type三種模式:
none | 關(guān)閉 |
simple | 同步阻塞等待MQ回執(zhí)消息 |
correlated(推薦) | MQ異步回調(diào)返回回執(zhí)消息 |
注意:以上兩種方式均會(huì)造成MQ性能下降,非必要不建議開(kāi)啟。失敗情況畢竟非常少,可在代碼中通過(guò)輸出日志或存儲(chǔ)數(shù)據(jù)庫(kù)等方式將發(fā)送失敗的消息記錄下來(lái),稍后手動(dòng)處理。
3、數(shù)據(jù)持久化
- 交換機(jī)持久化:在聲明交換器時(shí)將“durable”參數(shù)設(shè)置為true
- 隊(duì)列持久化:在聲明隊(duì)列的時(shí)將“durable”參數(shù)設(shè)置為true
- 消息持久化:生產(chǎn)消息時(shí)設(shè)置屬性delivery_mode=2
SpringBoot項(xiàng)目:
交換機(jī)和消息默認(rèn)為持久化,需自行設(shè)置隊(duì)列持久化。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "xx.queue", durable = "true"),
exchange = @Exchange(name = "xx.topic", type = ExchangeTypes.TOPIC),
key = "xx"
))
4、惰性隊(duì)列
從RabbitMQ的3.6.0版本開(kāi)始,就增加了Lazy Queues的概念,也就是惰性隊(duì)列。惰性隊(duì)列的特征如下:
- 接收到消息后直接存入磁盤(pán)而非內(nèi)存
- 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤(pán)中讀取并加載到內(nèi)存
- 支持?jǐn)?shù)百萬(wàn)條的消息存儲(chǔ)
聲明LazyQueue:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "xx.queue", durable = "true",
arguments = {@Argument(name = "x-queue-mode", value = "lazy"),
exchange = @Exchange(name = "xx.topic", type = ExchangeTypes.TOPIC),
key = "xx"
))
優(yōu)點(diǎn):?
基于磁盤(pán)存儲(chǔ),消息上限高
沒(méi)有間歇性的page-out,性能比較穩(wěn)定
缺點(diǎn):
基于磁盤(pán)存儲(chǔ),消息時(shí)效性會(huì)降低
性能受限于磁盤(pán)的IO
5、消費(fèi)失敗處理
失敗后嘗試在本地重試,重試后依然失敗,將消息投遞到用于投遞失敗消息的交換機(jī),存儲(chǔ)到失敗消息隊(duì)列中,等待后續(xù)手動(dòng)處理。
SpringBoot項(xiàng)目配置:
spring:
rabbitmq:
listener:
simple:
retry:
# 開(kāi)啟失敗重試
enabled: true
?SpringBoot項(xiàng)目配置類:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqErrorConfig {
@Bean
public DirectExchange errorExchange() {
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue() {
return new Queue("error.queue");
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
二、消息重復(fù)
由于網(wǎng)絡(luò)問(wèn)題或消息生產(chǎn)消費(fèi)過(guò)程中出現(xiàn)問(wèn)題,均會(huì)導(dǎo)致消息重復(fù)的情況。
1、通過(guò)業(yè)務(wù)保證冪等性(優(yōu)先)
在業(yè)務(wù)層面上,保證重復(fù)執(zhí)行對(duì)結(jié)果不產(chǎn)生影響。例如:支付成功后修改訂單狀態(tài),可以將未支付狀態(tài)作為修改語(yǔ)句的執(zhí)行條件。
2、通過(guò)消息狀態(tài)去重保證冪等性
如果不能通過(guò)業(yè)務(wù)保證冪等性,可以將處理過(guò)的消息ID記錄到redis,如果新到的消息ID已經(jīng)在記錄中,那么就不再處理這條消息。
三、消息堆積
1、優(yōu)化消費(fèi)者處理邏輯
優(yōu)化消費(fèi)者處理邏輯,使消費(fèi)者更快處理。
2、增加隊(duì)列及消費(fèi)者數(shù)量
將隊(duì)列綁定多個(gè)消費(fèi)者,提高消費(fèi)速度。
3、使用惰性隊(duì)列
基于磁盤(pán)存儲(chǔ),消息上限高。
四、保證消息順序消費(fèi)
發(fā)生原因:
- 一個(gè)隊(duì)列綁定多個(gè)消費(fèi)者
- 一個(gè)消費(fèi)者開(kāi)啟多個(gè)線程
1、將一個(gè)隊(duì)列拆分成多個(gè)隊(duì)列,保證一個(gè)隊(duì)列只綁定一個(gè)消費(fèi)者,生產(chǎn)者在投遞消息時(shí)根據(jù)業(yè)務(wù)數(shù)據(jù)關(guān)鍵值來(lái)將需要保證先后順序的同一類數(shù)據(jù)發(fā)送到同一個(gè)隊(duì)列當(dāng)中。
2、將隊(duì)列設(shè)置為單活模式文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-776925.html
x-single-active-consumer:?jiǎn)位钅J?,?是否最多只允許?個(gè)消費(fèi)者消費(fèi),如果有多個(gè)消費(fèi)者同時(shí)綁定,則只會(huì)激活第?個(gè),除?第?個(gè)消費(fèi)者被取消或者死亡,才會(huì)?動(dòng)轉(zhuǎn)到下?個(gè)消費(fèi)者。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-776925.html
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "xx.queue", durable = "true",
arguments = {@Argument(name = "x-single-active-consumer", value = "true", type = "java.lang.Boolean")}),
exchange = @Exchange(name = "xx.topic", type = ExchangeTypes.TOPIC),
key = "xx"
))
到了這里,關(guān)于RabbitMQ常見(jiàn)問(wèn)題及解決方案的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!