mq常見問題:消息丟失、消息重復消費、消息保證順序
消息丟失問題
拿rabbitmq舉例來說,出現(xiàn)消息丟失的場景如下圖
從圖中可以看到一共有以下三種可能出現(xiàn)消息丟失的情況:
1> 生產(chǎn)者丟消息
生產(chǎn)者在將數(shù)據(jù)發(fā)送到MQ的時候,可能由于網(wǎng)絡等原因造成消息投遞失敗
2>MQ自身丟消息
未開啟RabbitMQ的持久化,數(shù)據(jù)存儲于內(nèi)存,服務掛掉后隊列數(shù)據(jù)丟失;
開啟了RabbitMQ持久化,消息寫入后會持久化到磁盤,但是在落盤的時候掛掉了,不過這種概率很小
3>消費者弄丟了消息
消費者剛接收到消息還沒處理完成,結果消費者掛掉了…
針對以上三種情況,每種情況都有對應的處理方法:
1》生產(chǎn)者弄丟消息的解決方法
方法一:開啟RabbitMQ的事務
rabbitmq提供了與三個事務相關的命令:select開啟事務、commit提交事務、rollback回滾事務
采用該種方法由于事務機制,會導致吞吐量下降,太消耗性能。
方法二:開啟confirm模式
使用springboot時在application.yml配置文件中做如下配置
spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
發(fā)送者開啟 confirm 確認機制
publisher-confirm-type: correlated
實現(xiàn)confirm回調(diào)接口
@Slf4j
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息發(fā)送異常!");
//可以進行重發(fā)等操作
} else {
log.info("發(fā)送者已經(jīng)收到確認,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);
}
}
}
生產(chǎn)者發(fā)送消息時設置confirm回調(diào)
@Slf4j
@Configuration
public class RabbitMqConfig {
@Bean
public ConfirmCallbackService confirmCallbackService() {
return new ConfirmCallbackService();
}
@Bean
public RabbitTemplate rabbitTemplate(@Autowired CachingConnectionFactory factory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
/**
* 消費者確認收到消息后,手動ack回執(zhí)回調(diào)處理
*/
rabbitTemplate.setConfirmCallback(confirmCallbackService());
return rabbitTemplate;
}
//其他配置代碼
......
小結: 事務機制和 confirm機制最大的不同在于,事務機制是同步的,你提交一個事務之后會阻塞在那兒,但是 confirm機制是異步的,你發(fā)送個消息之后就可以發(fā)送下一個消息,RabbitMQ 接收了之后會異步回調(diào)confirm接口通知你這個消息接收到了。一般在生產(chǎn)者這塊避免數(shù)據(jù)丟失,建議使用用 confirm 機制。
2》MQ自身弄丟消息時的解決方法
使用持久化隊列,發(fā)送消息時做持久化到磁盤處理。
同時設置queue和message持久化以后,RabbitMQ 掛了再次重啟,也會從磁盤上重啟恢復 queue,恢復這個 queue 里的數(shù)據(jù),保證數(shù)據(jù)不會丟失。
但是就算開啟持久化機制,也有可能出現(xiàn)消息落盤時服務掛掉的情況。這時可以考慮結合生產(chǎn)者的confirm機制來處理,持久化機制開啟后消息只有成功落盤時才會通過confirm回調(diào)通知生產(chǎn)者,所以可以考慮生產(chǎn)者在生產(chǎn)消息時維護一個正在等待消息發(fā)送確認的隊列,如果超過一定時間還沒從confirm中收到對應消息的反饋,自動進行重發(fā)處理。
3》消費者自身弄丟消息時的解決方法
關閉自動ACK,使用手動ACK。RabbitMQ中有一個ACK機制,默認情況下消費者接收到到消息,RabbitMQ會自動提交ACK,之后這條消息就不會再發(fā)送給消費者了。我們可以更改為手動ACK模式,每次處理完消息之后,再手動ack一下。不過這樣可能會出現(xiàn)剛處理完還沒手動ack確認,消費者掛了,導致消息重復消費,不過我們只需要保證冪等性就好了,重復消費也不會造成問題。
在springboot中修改application.yml配置文件更改為手動ack模式
spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
發(fā)送者開啟 confirm 確認機制
publisher-confirm-type: correlated
發(fā)送者開啟 return 確認機制
publisher-returns: true
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1
auto-startup: true
default-requeue-rejected: true
設置消費端手動 ack
acknowledge-mode: manual
是否支持重試
retry:
enabled: true
消費端手動ack參考代碼:
@RabbitHandler
public void handlerMq(String msg, Channel channel, Message message) throws IOException {
try {
//業(yè)務處理代碼
......
//手動ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重復處理失敗,拒絕再次接收...", e);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
} else {
log.error("消息即將再次返回隊列處理...", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
消息重復消費問題
出現(xiàn)消息重復消費的情況
原因一
1、生產(chǎn)者發(fā)送給消息隊列以后,消息隊列會應達給生產(chǎn)者,但是這個過程中,消息隊列出問題了沒有收到消息,那么生產(chǎn)者就會重復發(fā)生消息,這時就產(chǎn)生了重復消息。
2、生產(chǎn)者發(fā)生消息給消息隊列,消息隊列由于數(shù)量太大延遲了,生產(chǎn)者等待響應超時了,這時生產(chǎn)者又會從新發(fā)生消息給消息隊列。
3、生產(chǎn)者和消息隊列因網(wǎng)絡問題引起,生產(chǎn)者會發(fā)起重試。這樣也會產(chǎn)生重復消息。
4、其實主要原因就是,消息成功進入了消息隊列,但是由于各種原因消息隊列沒有給生產(chǎn)者成功的返回值,而生產(chǎn)者又有重試機制這種情況下就會產(chǎn)生重復消息。
原因二
1、消息隊列推送給消費者,消費者處理消息這個過程中消費出現(xiàn)了問題,消息隊列不知道消費者處理結果,就會在次投遞。
2、消費者處理完,網(wǎng)絡出現(xiàn)問題,這時沒有給中間件消息隊列返回結果,消息隊列會在次投遞消費者。
3、消費者處理超時,超過了消息隊列的超時時間,這時消息隊列也會再次投遞。
4、消費者處理完結果返回給消息中間件,但是消息中間件出現(xiàn)問題,處理結果丟失了,重啟后,消息中間件內(nèi)部檢查發(fā)現(xiàn)這個消息還沒有處理也會在次投遞給消費者。
針對該問題,一般是在消費者端做冪等處理。
如何保證消息隊列消費的冪等性
這一塊應該還是要結合業(yè)務來選擇合適的方法,有以下幾個方案:
消費數(shù)據(jù)為了單純的寫入數(shù)據(jù)庫,可以先根據(jù)主鍵查詢數(shù)據(jù)是否已經(jīng)存在,如果已經(jīng)存在了就沒必要插入了?;蛘咧苯硬迦胍矝]問題,因為可以利用主鍵的唯一性來保證數(shù)據(jù)不會重復插入,重復插入只會報錯,但不會出現(xiàn)臟數(shù)據(jù)。
消費數(shù)據(jù)只是為了緩存到redis當中,這種情況就是直接往redis中set value了,天然的冪等性。
針對復雜的業(yè)務情況,可以在生產(chǎn)消息的時候給每個消息加一個全局唯一ID,消費者消費消息時根據(jù)這個ID去redis當中查詢之前是否消費過。如果沒有消費過,就進行消費并將這個消息的ID寫入到redis當中。如果已經(jīng)消費過了,就無需再次消費了。
消息保證順序消費
消息在投入到queue的時候是有順序,如果只是單個消費者來處理對應的單個queue,是不會出現(xiàn)消息錯亂的問題。但是在消費的時候有可能多個消費者消費同一個queue,由于各個消費者處理消息的時間不同,導致消息未能按照預期的順序處理。其實根本的問題就是如何保證消息按照預期的順序處理完成。
出現(xiàn)消費順序錯亂的情況
為了提高處理效率,一個queue存在多個consumer
一個queue只存在一個consumer,但是為了提高處理效率,consumer中使用了多線程進行處理
保證消息順序性的方法
將原來的一個queue拆分成多個queue,每個queue都有一個自己的consumer。該種方案的核心是生產(chǎn)者在投遞消息的時候根據(jù)業(yè)務數(shù)據(jù)關鍵值(例如訂單ID哈希值對訂單隊列數(shù)取模)來將需要保證先后順序的同一類數(shù)據(jù)(同一個訂單的數(shù)據(jù)) 發(fā)送到同一個queue當中。
一個queue就一個consumer,在consumer中維護多個內(nèi)存隊列,根據(jù)業(yè)務數(shù)據(jù)關鍵值(例如訂單ID哈希值對內(nèi)存隊列數(shù)取模)將消息加入到不同的內(nèi)存隊列中,然后多個真正負責處理消息的線程去各自對應的內(nèi)存隊列當中獲取消息進行消費。
RabbitMQ保證消息順序性總結:
核心思路就是根據(jù)業(yè)務數(shù)據(jù)關鍵值劃分成多個消息集合,而且每個消息集合中的消息數(shù)據(jù)都是有序的,每個消息集合有自己獨立的一個consumer。多個消息集合的存在保證了消息消費的效率,每個有序的消息集合對應單個的consumer也保證了消息消費時的有序性。
本文總結:
消息丟失:
生產(chǎn)者 建議使用異步confirm(非高并發(fā)需求情況下也可以考慮rabbitmq的事務機制)
mq:做消息持久化
消費者:關閉自動提交offset/自動ack 使用手動處理
重復消費:很容易解決,建立去重表,做冪等處理文章來源:http://www.zghlxwxcb.cn/news/detail-493048.html
如何保證有序:文章來源地址http://www.zghlxwxcb.cn/news/detail-493048.html
多個queue, 每個queue都有一個自己的consumer,將一類消息投遞到一個queue中
消費者維護內(nèi)存隊列,同一類消息hash到一個內(nèi)存隊列中
到了這里,關于mq常見問題:消息丟失、消息重復消費、消息保證順序的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!