DefaultMQPushConsumerImpl拉取消息
首先,DefaultMQPushConsumerImpl
是一個實現(xiàn)了 RocketMQ 的消費者客戶端接口的類。該類的主要作用是從 RocketMQ 的 Broker 獲取消息并進行消費。
主要可以通過pullMessage方法進行獲取對應的操作,如下圖所示。
在消費消息時,DefaultMQPushConsumerImpl
會將獲取到的消息放入一個processQueue
中,processQueue
包含了一個TreeMap
數(shù)據(jù)結(jié)構(gòu),它按照消息的 commitLogOffset
順序來排列。
DefaultMQPushConsumerImpl
通過定時的方式,從 Broker 上拉取消息。具體來說,它會調(diào)用DefaultMQPushConsumerImpl
自身定義的PullMessageService
類,該類會定時的從消息服務器中拉取消息。
源碼如下所示。
一旦消息拉取成功,PushConsumer
會將消息交給 processQueue
中的一個隊列進行處理,這個隊列對應同一個消息主題的同一個消息隊列。
processQueue
中的每個消息都會根據(jù)消息的commitLogOffset
排列位置。這個位置決定了消息被消費的順序。也就是說,processQueue
存放的順序決定了消息消費的順序。
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
consumeMessageService的并發(fā)消費和順序消費
consumeMessageService
是一個用于消費消息的服務方法,它可以實現(xiàn)消息的并發(fā)消費和順序消費。當使用 consumeMessageService
時,需要考慮業(yè)務的實際需求以及消息處理的性質(zhì),權(quán)衡使用并發(fā)消費和順序消費。
并發(fā)消費
并發(fā)消費是指多個消費者同時消費同一批消息以提高處理速度,需要注意消息冪等性以避免重復消費。
DefaultMQPushConsumer的consumeMessageBatchMaxSize參數(shù)默認值為1,表示默認批量消費的消息數(shù)量是1個。在并發(fā)消費方式下,若一個隊列中拉取到32條消息,則會創(chuàng)建32個ConsumeRequest對象,每個ConsumeRequest對象對應1條消息,提交到線程池中運行。
順序消費
順序消費則是按照消息產(chǎn)生的順序逐個消費,適合處理需要順序進行的業(yè)務邏輯,如訂單處理,但實現(xiàn)可能帶來性能瓶頸,需謹慎設(shè)計。指同一時刻,一個 queue 只有一個線程在消費。只讓一個線程消費,由加鎖來實現(xiàn),而順序則由 TreeMap 來實現(xiàn)。
一個隊列中拉取到32條消息,則只會創(chuàng)建一個ConsumeRequest對象,該對象會被提交到線程池中,在ConsumeRequest.run方法中會按照消息的offset順序一條一條地消費,直到TreeMap為空。
concurrently 創(chuàng)建 ConsumeRequest
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
消費者在消費消息時,根據(jù)批量消費的大小來決定是將任務提交到線程池中一次性消費,還是將任務分成多次提交到線程池中進行消費。
首先判斷msgs中消息的數(shù)量是否小于等于一個批量消費數(shù)量consumeBatchSize,如果小于等于,那么將所有消息封裝成一個ConsumeRequest對象并提交到consumeExecutor線程池中,其中dispatchToConsume表示是否立即分發(fā)給消費者消費。
如果消息數(shù)量大于批量消費數(shù)量,那么將消息分段提交到線程池中進行消費。首先通過兩層循環(huán),將msgs中的消息按照consumeBatchSize分成若干個小的MessageExt列表,每個小的MessageExt列表封裝成一個ConsumeRequest對象并提交到consumeExecutor線程池中。
如果線程池提交任務出現(xiàn)拒絕執(zhí)行異常,說明該線程池已經(jīng)滿了,這時候需要將當前小的MessageExt列表繼續(xù)循環(huán)并依次每次取出一個消息封裝成ConsumeRequest對象進行提交,直到所有的小的MessageExt列表被完整地提交到線程池中。若還有未提交的列表,則將該ConsumeRequest對象提交到一個新的線程池中進行定時的重復提交。
concurrently ConsumeRequest#run 消費主體邏輯
消息消費者消費消息的地方,listener.consumeMessage方法會被消費者調(diào)用,將消息列表和消息處理上下文傳入。
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
-
msgs是需要消費的消息列表,這里使用了Collections.unmodifiableList方法來創(chuàng)建一個不可修改的消息列表,這是為了保證消息的安全性,防止消息在消費過程中被意外或惡意修改。
-
context是消息處理的上下文,可能包含消費者的訂閱信息、消費進度等信息,可根據(jù)業(yè)務需要進行擴展和使用。
-
consumeMessage方法返回消費結(jié)果,通常是一個枚舉類型,表示消費結(jié)果的狀態(tài),如消費成功、消費失敗等。消費結(jié)果會影響消息處理的下一步流程。
消費結(jié)束之后清除數(shù)據(jù)
主要用于移除已經(jīng)消費完成的消息。直接從 msgTreeMap 中刪除消息,并返回 msgTreeMap 中第一條消息的 queue offset 值。
org.apache.rocketmq.client.impl.consumer.ProcessQueue#removeMessage
public long removeMessage(final List<MessageExt> msgs) {
long result = -1;
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!msgTreeMap.isEmpty()) {
result = this.queueOffsetMax + 1;
int removedCnt = 0;
for (MessageExt msg : msgs) {
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
msgSize.addAndGet(0 - msg.getBody().length);
}
}
msgCount.addAndGet(removedCnt);
if (!msgTreeMap.isEmpty()) {
result = msgTreeMap.firstKey();
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (Throwable t) {
log.error("removeMessage exception", t);
}
return result;
}
具體來說,它接收一個 MessageExt 類型的消息列表msgs,通過遍歷msgs,查找msgTreeMap中相應的消息,將找到的消息刪除并計數(shù),更新msgCount和msgSize這兩個計數(shù)器。代碼中也使用了重入鎖lockTreeMap來保證線程安全。函數(shù)將返回result,表示下一步應該消費的消息的offset,如果沒有可消費的消息,則返回-1。
orderly 創(chuàng)建 ConsumeRequest
在消息消費過程中,判斷是否需要立即將消息分發(fā)給消費者進行消費。
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
首先判斷參數(shù)dispathToConsume為true,如果為true,表示需要立即分發(fā)給消費者消費;否則就不需要進行分發(fā),因為可能等待其他條件觸發(fā)再進行消費。
如果需要立即分發(fā),那么將該消息的消息隊列和消息處理隊列封裝成ConsumeRequest對象,并將該對象提交到consumeExecutor線程池中進行執(zhí)行。每個消費者線程從consumeExecutor線程池中取出ConsumeRequest對象并進行消費。
orderly ConsumeRequest#run 消費主體邏輯
先簡單介紹一下 RocketMQ 消息消費的流程:消費者將消息從 Broker 中拉取到本地的 ProcessQueue 中,然后在 ProcessQueue 中進行消息消費。
// 獲取鎖
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
for (boolean continueConsume = true; continueConsume; ) {
// 從 TreeMap 中獲得消息
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} else {
continueConsume = false;
}
}
...
}
public class MessageQueueLock {
private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();
public Object fetchLockObject(final MessageQueue mq) {
Object objLock = this.mqLockTable.get(mq);
if (null == objLock) {
objLock = new Object();
Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
if (prevLock != null) {
objLock = prevLock;
}
}
return objLock;
}
}
首先實例化了 MessageQueueLock,用于保證多線程環(huán)境下的線程同步和互斥。在代碼的第一行中,獲取到了當前 MessageQueue 的鎖對象 objLock。這個鎖對象是在 mqLockTable 中獲取的,mqLockTable 存儲了每個 MessageQueue 的鎖對象,用于對不同的 MessageQueue 進行互斥控制。
在代碼的后面,使用 synchronized 對 objLock 進行加鎖,并進入到了循環(huán)中。在循環(huán)中,調(diào)用 processQueue.takeMessags() 方法從 ProcessQueue 中獲取消息,返回的是一個消息列表。如果消息列表不為空,則調(diào)用 messageListener.consumeMessage() 方法來進行消息消費。
如果消息列表為空,說明當前的 ProcessQueue 中沒有更多的消息,結(jié)束當前的循環(huán),并退出 synchronized 塊,釋放了 objLock 的鎖,等待下一次的消費請求。
整個邏輯是通過鎖機制來實現(xiàn)對 ProcessQueue 進行互斥控制的,保證了多個消費者之間的消費的安全性。同時,使用了循環(huán)來進行多次消費。
順序處理機制
take消息時,將消息從 msgTreeMap 取出,并放入 consumingMsgOrderlyTreeMap。消費完成后,清空 consumingMsgOrderlyTreeMap。將 offset 設(shè)為 this.consumingMsgOrderlyTreeMap.lastKey() + 1,表示已經(jīng)消費的消息的下一條消息的 offset。
// org.apache.rocketmq.client.impl.consumer.ProcessQueue#commit
public long commit() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
msgSize.addAndGet(0 - msg.getBody().length);
}
this.consumingMsgOrderlyTreeMap.clear();
if (offset != null) {
return offset + 1;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("commit exception", e);
}
return -1;
}
關(guān)于 offset 提交
offset 是消費者從 broker 拉取的下一條消息的偏移量
消息消費的失敗
-
順序消費:如果處理某條消息失敗且重試次數(shù)小于閾值,從 consumingMsgOrderlyTreeMap 中取出這條消息并重新放入 msgTreeMap;如果重試次數(shù)超過閾值,則將消息發(fā)送回 broker 并根據(jù)重試次數(shù)決定發(fā)送消息到 SCHDULE_TOPIC_XXXX 或死信隊列
-
并發(fā)消費:如果處理消息時失敗,則將消息發(fā)送回 broker。如果發(fā)送失敗,將會繼續(xù)消費消息,直到成功消費并提交給 broker。文章來源:http://www.zghlxwxcb.cn/news/detail-501411.html
發(fā)送 ConsumeRequest 的時機有兩個,一是在拉取到消息后,二是在出現(xiàn)異常后延遲提交。文章來源地址http://www.zghlxwxcb.cn/news/detail-501411.html
到了這里,關(guān)于【深入淺出RocketMQ原理及實戰(zhàn)】「底層原理挖掘系列」透徹剖析貫穿RocketMQ的消息順序消費和并發(fā)消費機制體系的原理分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!