国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【深入淺出RocketMQ原理及實戰(zhàn)】「底層原理挖掘系列」透徹剖析貫穿RocketMQ的消息順序消費和并發(fā)消費機制體系的原理分析

這篇具有很好參考價值的文章主要介紹了【深入淺出RocketMQ原理及實戰(zhàn)】「底層原理挖掘系列」透徹剖析貫穿RocketMQ的消息順序消費和并發(fā)消費機制體系的原理分析。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

DefaultMQPushConsumerImpl拉取消息

首先,DefaultMQPushConsumerImpl 是一個實現(xiàn)了 RocketMQ 的消費者客戶端接口的類。該類的主要作用是從 RocketMQ 的 Broker 獲取消息并進行消費。

主要可以通過pullMessage方法進行獲取對應的操作,如下圖所示。
【深入淺出RocketMQ原理及實戰(zhàn)】「底層原理挖掘系列」透徹剖析貫穿RocketMQ的消息順序消費和并發(fā)消費機制體系的原理分析

在消費消息時,DefaultMQPushConsumerImpl 會將獲取到的消息放入一個processQueue中,processQueue包含了一個TreeMap數(shù)據(jù)結(jié)構(gòu),它按照消息的 commitLogOffset 順序來排列。
【深入淺出RocketMQ原理及實戰(zhàn)】「底層原理挖掘系列」透徹剖析貫穿RocketMQ的消息順序消費和并發(fā)消費機制體系的原理分析

DefaultMQPushConsumerImpl 通過定時的方式,從 Broker 上拉取消息。具體來說,它會調(diào)用DefaultMQPushConsumerImpl 自身定義的PullMessageService類,該類會定時的從消息服務器中拉取消息。

源碼如下所示。
【深入淺出RocketMQ原理及實戰(zhàn)】「底層原理挖掘系列」透徹剖析貫穿RocketMQ的消息順序消費和并發(fā)消費機制體系的原理分析
一旦消息拉取成功,PushConsumer 會將消息交給 processQueue 中的一個隊列進行處理,這個隊列對應同一個消息主題的同一個消息隊列。

processQueue 中的每個消息都會根據(jù)消息的commitLogOffset排列位置。這個位置決定了消息被消費的順序。也就是說,processQueue 存放的順序決定了消息消費的順序。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

consumeMessageService的并發(fā)消費和順序消費

consumeMessageService 是一個用于消費消息的服務方法,它可以實現(xiàn)消息的并發(fā)消費和順序消費。當使用 consumeMessageService 時,需要考慮業(yè)務的實際需求以及消息處理的性質(zhì),權(quán)衡使用并發(fā)消費和順序消費。

并發(fā)消費

并發(fā)消費是指多個消費者同時消費同一批消息以提高處理速度,需要注意消息冪等性以避免重復消費。

DefaultMQPushConsumer的consumeMessageBatchMaxSize參數(shù)默認值為1,表示默認批量消費的消息數(shù)量是1個。在并發(fā)消費方式下,若一個隊列中拉取到32條消息,則會創(chuàng)建32個ConsumeRequest對象,每個ConsumeRequest對象對應1條消息,提交到線程池中運行。

順序消費

順序消費則是按照消息產(chǎn)生的順序逐個消費,適合處理需要順序進行的業(yè)務邏輯,如訂單處理,但實現(xiàn)可能帶來性能瓶頸,需謹慎設(shè)計。指同一時刻,一個 queue 只有一個線程在消費。只讓一個線程消費,由加鎖來實現(xiàn),而順序則由 TreeMap 來實現(xiàn)。

一個隊列中拉取到32條消息,則只會創(chuàng)建一個ConsumeRequest對象,該對象會被提交到線程池中,在ConsumeRequest.run方法中會按照消息的offset順序一條一條地消費,直到TreeMap為空。

concurrently 創(chuàng)建 ConsumeRequest

public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        for (int total = 0; total < msgs.size(); ) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }
            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}

消費者在消費消息時,根據(jù)批量消費的大小來決定是將任務提交到線程池中一次性消費,還是將任務分成多次提交到線程池中進行消費。

首先判斷msgs中消息的數(shù)量是否小于等于一個批量消費數(shù)量consumeBatchSize,如果小于等于,那么將所有消息封裝成一個ConsumeRequest對象并提交到consumeExecutor線程池中,其中dispatchToConsume表示是否立即分發(fā)給消費者消費。

如果消息數(shù)量大于批量消費數(shù)量,那么將消息分段提交到線程池中進行消費。首先通過兩層循環(huán),將msgs中的消息按照consumeBatchSize分成若干個小的MessageExt列表,每個小的MessageExt列表封裝成一個ConsumeRequest對象并提交到consumeExecutor線程池中。

如果線程池提交任務出現(xiàn)拒絕執(zhí)行異常,說明該線程池已經(jīng)滿了,這時候需要將當前小的MessageExt列表繼續(xù)循環(huán)并依次每次取出一個消息封裝成ConsumeRequest對象進行提交,直到所有的小的MessageExt列表被完整地提交到線程池中。若還有未提交的列表,則將該ConsumeRequest對象提交到一個新的線程池中進行定時的重復提交。

concurrently ConsumeRequest#run 消費主體邏輯

消息消費者消費消息的地方,listener.consumeMessage方法會被消費者調(diào)用,將消息列表和消息處理上下文傳入。

status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
  • msgs是需要消費的消息列表,這里使用了Collections.unmodifiableList方法來創(chuàng)建一個不可修改的消息列表,這是為了保證消息的安全性,防止消息在消費過程中被意外或惡意修改。

  • context是消息處理的上下文,可能包含消費者的訂閱信息、消費進度等信息,可根據(jù)業(yè)務需要進行擴展和使用。

  • consumeMessage方法返回消費結(jié)果,通常是一個枚舉類型,表示消費結(jié)果的狀態(tài),如消費成功、消費失敗等。消費結(jié)果會影響消息處理的下一步流程。

消費結(jié)束之后清除數(shù)據(jù)

主要用于移除已經(jīng)消費完成的消息。直接從 msgTreeMap 中刪除消息,并返回 msgTreeMap 中第一條消息的 queue offset 值。

org.apache.rocketmq.client.impl.consumer.ProcessQueue#removeMessage

public long removeMessage(final List<MessageExt> msgs) {
    long result = -1;
    final long now = System.currentTimeMillis();
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        this.lastConsumeTimestamp = now;
        try {
            if (!msgTreeMap.isEmpty()) {
                result = this.queueOffsetMax + 1;
                int removedCnt = 0;
                for (MessageExt msg : msgs) {
                    MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                    if (prev != null) {
                        removedCnt--;
                        msgSize.addAndGet(0 - msg.getBody().length);
                    }
                }
                msgCount.addAndGet(removedCnt);
                if (!msgTreeMap.isEmpty()) {
                    result = msgTreeMap.firstKey();
                }
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (Throwable t) {
        log.error("removeMessage exception", t);
    }
    return result;
}

具體來說,它接收一個 MessageExt 類型的消息列表msgs,通過遍歷msgs,查找msgTreeMap中相應的消息,將找到的消息刪除并計數(shù),更新msgCount和msgSize這兩個計數(shù)器。代碼中也使用了重入鎖lockTreeMap來保證線程安全。函數(shù)將返回result,表示下一步應該消費的消息的offset,如果沒有可消費的消息,則返回-1。

orderly 創(chuàng)建 ConsumeRequest

在消息消費過程中,判斷是否需要立即將消息分發(fā)給消費者進行消費。

public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispathToConsume) {
    if (dispathToConsume) {
        ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }
}

首先判斷參數(shù)dispathToConsume為true,如果為true,表示需要立即分發(fā)給消費者消費;否則就不需要進行分發(fā),因為可能等待其他條件觸發(fā)再進行消費。

如果需要立即分發(fā),那么將該消息的消息隊列和消息處理隊列封裝成ConsumeRequest對象,并將該對象提交到consumeExecutor線程池中進行執(zhí)行。每個消費者線程從consumeExecutor線程池中取出ConsumeRequest對象并進行消費。

orderly ConsumeRequest#run 消費主體邏輯

先簡單介紹一下 RocketMQ 消息消費的流程:消費者將消息從 Broker 中拉取到本地的 ProcessQueue 中,然后在 ProcessQueue 中進行消息消費。

// 獲取鎖
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
    for (boolean continueConsume = true; continueConsume; ) {
        // 從 TreeMap 中獲得消息
        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
        if (!msgs.isEmpty()) {
            status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
        } else {
            continueConsume = false;
        }
    }
    ...
}

public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();

    public Object fetchLockObject(final MessageQueue mq) {
        Object objLock = this.mqLockTable.get(mq);
        if (null == objLock) {
            objLock = new Object();
            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
            if (prevLock != null) {
                objLock = prevLock;
            }
        }

        return objLock;
    }
}

首先實例化了 MessageQueueLock,用于保證多線程環(huán)境下的線程同步和互斥。在代碼的第一行中,獲取到了當前 MessageQueue 的鎖對象 objLock。這個鎖對象是在 mqLockTable 中獲取的,mqLockTable 存儲了每個 MessageQueue 的鎖對象,用于對不同的 MessageQueue 進行互斥控制。

在代碼的后面,使用 synchronized 對 objLock 進行加鎖,并進入到了循環(huán)中。在循環(huán)中,調(diào)用 processQueue.takeMessags() 方法從 ProcessQueue 中獲取消息,返回的是一個消息列表。如果消息列表不為空,則調(diào)用 messageListener.consumeMessage() 方法來進行消息消費。

如果消息列表為空,說明當前的 ProcessQueue 中沒有更多的消息,結(jié)束當前的循環(huán),并退出 synchronized 塊,釋放了 objLock 的鎖,等待下一次的消費請求。

整個邏輯是通過鎖機制來實現(xiàn)對 ProcessQueue 進行互斥控制的,保證了多個消費者之間的消費的安全性。同時,使用了循環(huán)來進行多次消費。

順序處理機制

take消息時,將消息從 msgTreeMap 取出,并放入 consumingMsgOrderlyTreeMap。消費完成后,清空 consumingMsgOrderlyTreeMap。將 offset 設(shè)為 this.consumingMsgOrderlyTreeMap.lastKey() + 1,表示已經(jīng)消費的消息的下一條消息的 offset。

// org.apache.rocketmq.client.impl.consumer.ProcessQueue#commit

public long commit() {
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
            msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
            for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
                msgSize.addAndGet(0 - msg.getBody().length);
            }
            this.consumingMsgOrderlyTreeMap.clear();
            if (offset != null) {
                return offset + 1;
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("commit exception", e);
    }

    return -1;
}
關(guān)于 offset 提交

offset 是消費者從 broker 拉取的下一條消息的偏移量

消息消費的失敗

  • 順序消費:如果處理某條消息失敗且重試次數(shù)小于閾值,從 consumingMsgOrderlyTreeMap 中取出這條消息并重新放入 msgTreeMap;如果重試次數(shù)超過閾值,則將消息發(fā)送回 broker 并根據(jù)重試次數(shù)決定發(fā)送消息到 SCHDULE_TOPIC_XXXX 或死信隊列

  • 并發(fā)消費:如果處理消息時失敗,則將消息發(fā)送回 broker。如果發(fā)送失敗,將會繼續(xù)消費消息,直到成功消費并提交給 broker。

發(fā)送 ConsumeRequest 的時機有兩個,一是在拉取到消息后,二是在出現(xiàn)異常后延遲提交。文章來源地址http://www.zghlxwxcb.cn/news/detail-501411.html

到了這里,關(guān)于【深入淺出RocketMQ原理及實戰(zhàn)】「底層原理挖掘系列」透徹剖析貫穿RocketMQ的消息順序消費和并發(fā)消費機制體系的原理分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務器費用

相關(guān)文章

  • 《深入淺出SSD:固態(tài)存儲核心技術(shù)、原理與實戰(zhàn)》----學習記錄(二)

    《深入淺出SSD:固態(tài)存儲核心技術(shù)、原理與實戰(zhàn)》----學習記錄(二)

    SSD主要由兩大模塊構(gòu)成—— 主控和閃存介質(zhì) 。其實除了上述兩大模塊外,可選的還有緩存單元。主控是SSD的大腦,承擔著指揮、運算和協(xié)調(diào)的作用,具體表現(xiàn)在 一是實現(xiàn)標準主機接口與主機通信 二是實現(xiàn)與閃存的通信 三是運行SSD內(nèi)部FTL算法 可以說,一款主控芯片的好壞直

    2024年02月12日
    瀏覽(25)
  • 【大蝦送書第七期】深入淺出SSD:固態(tài)存儲核心技術(shù)、原理與實戰(zhàn)

    【大蝦送書第七期】深入淺出SSD:固態(tài)存儲核心技術(shù)、原理與實戰(zhàn)

    目錄 ??寫在前面? ??內(nèi)容簡介 ??作者簡介 ??名人推薦 ??文末福利 ???????博客主頁:大蝦好吃嗎的博客 ? ? ???專欄地址:免費送書活動專欄地址 ????????近年來國家大力支持半導體行業(yè),鼓勵自主創(chuàng)新,中國SSD技術(shù)和產(chǎn)業(yè)良性發(fā)展,產(chǎn)業(yè)鏈在不斷完善,與

    2024年02月10日
    瀏覽(24)
  • 深入淺出FISCO BCOS:區(qū)塊鏈底層平臺

    深入淺出FISCO BCOS:區(qū)塊鏈底層平臺

    ? ? 蘇澤 大家好 這里是蘇澤 一個鐘愛區(qū)塊鏈技術(shù)的后端開發(fā)者 本篇專欄 ?← 持續(xù)記錄本人自學兩年走過無數(shù)彎路的智能合約學習筆記和經(jīng)驗總結(jié) 如果喜歡拜托三連支持~ 目錄 我前面有補充相關(guān)的區(qū)塊鏈的知識 如果沒有了解的話 可能部分概念或名詞會不懂哦 建議先了解一

    2024年03月16日
    瀏覽(39)
  • K8s項目實戰(zhàn)筆記獲阿里技術(shù)大咖力薦,深入淺出解讀容器編排原理與應用

    K8s項目實戰(zhàn)筆記獲阿里技術(shù)大咖力薦,深入淺出解讀容器編排原理與應用

    一、前言 Kubernetes,簡稱K8s,宛如一位技藝高超的舞臺導演,優(yōu)雅地指揮著容器集群的華麗表演。它不僅僅是一個開源的容器集群管理系統(tǒng),更是自動化部署、智能擴縮容與維護等功能的集大成者。作為領(lǐng)軍的容器編排工具,Kubernetes展現(xiàn)了基于容器技術(shù)的分布式架構(gòu)的無盡魅

    2024年03月10日
    瀏覽(22)
  • 【深入淺出Docker原理及實戰(zhàn)】「原理實戰(zhàn)體系」零基礎(chǔ)+全方位帶你學習探索Docker容器開發(fā)實戰(zhàn)指南(Docker-compose使用全解 一)

    【深入淺出Docker原理及實戰(zhàn)】「原理實戰(zhàn)體系」零基礎(chǔ)+全方位帶你學習探索Docker容器開發(fā)實戰(zhàn)指南(Docker-compose使用全解 一)

    Docker Compose是一款用于定義和運行復雜應用程序的Docker工具。在使用Docker容器的應用中,通常由多個容器組成。使用Docker Compose可以擺脫使用shell腳本來啟動容器的繁瑣過程。 Compose通過一個配置文件來管理多個Docker容器。在配置文件中,我們使用services來定義所有的容器。然后

    2024年01月17日
    瀏覽(26)
  • 【深入淺出Spring原理及實戰(zhàn)】「夯實基礎(chǔ)系列」360全方位滲透和探究Spring的核心注解開發(fā)和實現(xiàn)指南(Spring5的常見的注解)

    【深入淺出Spring原理及實戰(zhàn)】「夯實基礎(chǔ)系列」360全方位滲透和探究Spring的核心注解開發(fā)和實現(xiàn)指南(Spring5的常見的注解)

    Spring 5.x中常見的注解包括@Controller、@Service、@Repository。當我們研究Spring Boot源碼時,會發(fā)現(xiàn)實際上提供了更多的注解。了解這些注解對于我們非常重要,盡管目前可能還用不到它們。 注解 功能 @Bean 器中注冊組件,代替來的標簽 @Configuration 聲明這是一個配置類,替換以前的配

    2024年02月16日
    瀏覽(21)
  • 論文解讀:Bert原理深入淺出

    摘取于https://www.jianshu.com/p/810ca25c4502 任務1:Masked Language Model Maked LM 是為了解決單向信息問題,現(xiàn)有的語言模型的問題在于,沒有同時利用雙向信息,如 ELMO 號稱是雙向LM,但實際上是兩個單向 RNN 構(gòu)成的語言模型的拼接,由于時間序列的關(guān)系,RNN模型預測當前詞只依賴前面出

    2024年02月11日
    瀏覽(20)
  • 深入淺出:Zookeeper的原理與實踐

    在當今的信息時代,分布式系統(tǒng)的應用越來越廣泛,而其中一個至關(guān)重要的組成部分就是Zookeeper。作為一個分布式協(xié)調(diào)服務,Zookeeper在保障分布式系統(tǒng)的一致性、可靠性和可用性方面發(fā)揮著不可替代的作用。本博客旨在深入淺出地探討Zookeeper的原理與實踐,幫助讀者全面理解

    2024年04月11日
    瀏覽(27)
  • 深入淺出Java中參數(shù)傳遞的原理

    深入淺出Java中參數(shù)傳遞的原理

    今天,想和大家聊聊關(guān)于java中的參數(shù)傳遞的原理,參數(shù)的傳遞有兩種,值傳遞和引用傳遞。 值傳遞 :是指在調(diào)用函數(shù)時將實際參數(shù)復制一份傳遞到函數(shù)中,這樣在函數(shù)中如果對參數(shù)進行修改,將不會影響到實際參數(shù)。 引用傳遞 :是指在調(diào)用函數(shù)時將實際參數(shù)的地址傳遞到

    2024年02月01日
    瀏覽(18)
  • 深入淺出講解自動駕駛 - 激光雷達原理和結(jié)構(gòu)簡介

    深入淺出講解自動駕駛 - 激光雷達原理和結(jié)構(gòu)簡介

    ?? 個人主頁 : 同學來啦 ?? 版權(quán) : 本文由【同學來啦】原創(chuàng)、在CSDN首發(fā)、需要轉(zhuǎn)載請聯(lián)系博主 ?? 如果文章對你有幫助, 歡迎關(guān)注、點贊、收藏和訂閱專欄哦 激光雷達最先應用于海洋深度探測領(lǐng)域,其實現(xiàn)思路是通過相同回波之間的時間差實現(xiàn)海洋深度測算。后來不斷演

    2024年02月16日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包