国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

[RocketMQ] Consumer消費(fèi)者啟動(dòng)主要流程源碼 (六)

這篇具有很好參考價(jià)值的文章主要介紹了[RocketMQ] Consumer消費(fèi)者啟動(dòng)主要流程源碼 (六)。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

客戶端常用的消費(fèi)者類是DefaultMQPushConsumer,

rocketmq消費(fèi)者代碼,RocketMq 基礎(chǔ),RocketMq 源碼分析,java-rocketmq,rocketmq,java

DefaultMQPushConsumer的構(gòu)造器以及start方法的源碼。

1.創(chuàng)建DefaultMQPushConsumer實(shí)例

最終都是調(diào)用下面四個(gè)參數(shù)的構(gòu)造函數(shù):

rocketmq消費(fèi)者代碼,RocketMq 基礎(chǔ),RocketMq 源碼分析,java-rocketmq,rocketmq,java

/**
 * 創(chuàng)建DefaultMQPushConsumer實(shí)例
 *
 * @param namespace                    namespace地址
 * @param consumerGroup                消費(fèi)者組
 * @param rpcHook                      在每個(gè)遠(yuǎn)程處理命令之前執(zhí)行的RPC鉤子
 * @param allocateMessageQueueStrategy 消費(fèi)者之間消息分配的策略算法
 */
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
                             AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
    this.consumerGroup = consumerGroup;
    this.namespace = namespace;
    this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    //創(chuàng)建DefaultMQPushConsumerImpl實(shí)例
    defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}

指定了命名空間、生產(chǎn)者組、RPC鉤子和消費(fèi)者之間消息分配的策略算法的構(gòu)造器, 創(chuàng)建了一個(gè)DefaultMQPushConsumerImpl實(shí)例, DefaultMQPushConsumer可以看作是DefaultMQPushConsumerImpl的包裝類, 給開發(fā)人員用的。DefaultMQPushConsumer可以看作為DefaultMQPushConsumerImpl的門面。

rocketmq消費(fèi)者代碼,RocketMq 基礎(chǔ),RocketMq 源碼分析,java-rocketmq,rocketmq,java

public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook
 rpcHook) {
    this.defaultMQPushConsumer = defaultMQPushConsumer;
    this.rpcHook = rpcHook;
    // consumer 狀態(tài)錯(cuò)誤時(shí)采用定時(shí)任務(wù)定時(shí)執(zhí)行拉取請求的時(shí)間間隔
    this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();
}

創(chuàng)建了DefaultMQPushConsumer實(shí)例之后, 設(shè)置一些屬性, 比如namesrvAddr、consumeFromWhere、注冊messageListener消息監(jiān)聽器等等。這些都是簡單的屬性賦值操作, 除了subscribe方法。

1.1 subscribe訂閱

subscribe方法表示Consumer訂閱的自己感興趣的Topic, 并且支持對消息進(jìn)行過濾, 過濾表達(dá)式支持TAG和SQL92兩種類型, 他們都會被解析為SubscriptionData對象, 最終將topic與SubscriptionData的關(guān)系維護(hù)到RebalanceImpl內(nèi)部的subscriptionInner這個(gè)map集合中。

rocketmq消費(fèi)者代碼,RocketMq 基礎(chǔ),RocketMq 源碼分析,java-rocketmq,rocketmq,java

/**
 * DefaultMQPushConsumer的方法
 * <p>
 * 訂閱topic,支持消息過濾表達(dá)式
 *
 * @param topic         訂閱的topic
 * @param subExpression 訂閱表達(dá)式。它僅支持或操作,如“tag1 | | tag2 | | tag3”,如果為 null 或 *,則表示訂閱全部
 */
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
    //調(diào)用defaultMQPushConsumerImpl的subscribe方法
    this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression);
}

rocketmq消費(fèi)者代碼,RocketMq 基礎(chǔ),RocketMq 源碼分析,java-rocketmq,rocketmq,java

/**
 * DefaultMQPushConsumerImpl的方法
 * <p>
 * 訂閱topic
 */
public void subscribe(String topic, String subExpression) throws MQClientException {
    try {
        //解析訂閱表達(dá)式,構(gòu)建SubscriptionData
        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
        //將topic與SubscriptionData的關(guān)系維護(hù)到RebalanceImpl內(nèi)部的subscriptionInner這個(gè)map集合中
        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
        if (this.mQClientFactory != null) {
            //如果mQClientFactory不為null,則發(fā)送心跳信息給所有broker。
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}

2.start啟動(dòng)消費(fèi)者

start方法內(nèi)部會執(zhí)行很多初始化操作。

rocketmq消費(fèi)者代碼,RocketMq 基礎(chǔ),RocketMq 源碼分析,java-rocketmq,rocketmq,java

/**
 * DefaultMQPushConsumer的方法
 * <p>
 * 啟動(dòng)消費(fèi)者
 */
@Override
public void start() throws MQClientException {
    //根據(jù)namespace和consumerGroup設(shè)置消費(fèi)者組
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    //默認(rèn)消費(fèi)者實(shí)現(xiàn)啟動(dòng)
    this.defaultMQPushConsumerImpl.start();
    //消息軌跡跟蹤服務(wù),默認(rèn)null
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

defaultMQPushConsumerImpl#start方法。

rocketmq消費(fèi)者代碼,RocketMq 基礎(chǔ),RocketMq 源碼分析,java-rocketmq,rocketmq,java

rocketmq消費(fèi)者代碼,RocketMq 基礎(chǔ),RocketMq 源碼分析,java-rocketmq,rocketmq,java

rocketmq消費(fèi)者代碼,RocketMq 基礎(chǔ),RocketMq 源碼分析,java-rocketmq,rocketmq,java

  1. checkConfig方法檢查消費(fèi)者的配置信息: 如果consumerGroup為空, 或者長度大于255個(gè)字符, 或者包含非法字符, 或者消費(fèi)者組名為默認(rèn)組名DEFAULT_CONSUMER, 或者messageModel為空, 或者consumeFromWhere為空, 或者consumeTimestamp為空, 或者allocateMessageQueueStrategy為空, 滿足以上任意條件都校驗(yàn)不通過拋出異常。

  2. 調(diào)用copySubscription方法, 拷貝訂閱關(guān)系, 然后為集群消費(fèi)模式的消費(fèi)者配置重試主題用于實(shí)現(xiàn)消費(fèi)重試。

  3. 調(diào)用getOrCreateMQClientInstance方法, 根據(jù)clientId獲取或者創(chuàng)建CreateMQClientInstance實(shí)例, 并賦給mQClientFactory變量。

  4. 設(shè)置負(fù)載均衡服務(wù)rebalanceImpl的相關(guān)屬性。

  5. 創(chuàng)建消息拉取核心對象PullAPIWrapper, 封裝了消息拉取以及結(jié)果解析邏輯的api。

  6. 根據(jù)消息模式設(shè)置不同的OffsetStore, 用于實(shí)現(xiàn)消費(fèi)者的消息消費(fèi)偏移量offset的管理。如果是廣播消費(fèi)模式, 是LocalFileOffsetStore, offset存儲在本地磁盤, 如果是集群模式, 是RemoteBrokerOffsetStore, offset存儲在遠(yuǎn)程broker中。

  7. 調(diào)用offsetStore.load加載消費(fèi)偏移量, LocalFileOffsetStore會加載本地磁盤中的數(shù)據(jù)。

  8. 根據(jù)消息監(jiān)聽器MessageListener的類型創(chuàng)建不同的消息消費(fèi)服務(wù)ConsumeMessageService, MessageListenerOrderly類型表示順序消費(fèi), 創(chuàng)建ConsumeMessageOrderlyService, MessageListenerConcurrently類型表示并發(fā)消費(fèi), 創(chuàng)建ConsumeMessageConcurrentlyService。

  9. 調(diào)用consumeMessageService.start啟動(dòng)消息消費(fèi)服務(wù), 消息拉取服務(wù)PullMessageService拉取到消息后, 會構(gòu)建ConsumeRequest對象交給consumeMessageService去消費(fèi)。

  10. 注冊消費(fèi)者組和消費(fèi)者到MQClientInstance中的consumerTable中, 如果沒有注冊成功, 可能是因?yàn)橥粋€(gè)程序中存在同名消費(fèi)者組的不同消費(fèi)者, 拋出異常。

  11. mQClientFactory#start啟動(dòng)CreateMQClientInstance, 初始化netty服務(wù)、各種定時(shí)任務(wù)、拉取消息服務(wù)、rebalanceService服務(wù)等等。CreateMQClientInstance僅會被初始化一次

  12. updateTopicSubscribeInfoWhenSubscriptionChanged方法: 向NameServer拉取并更新當(dāng)前消費(fèi)者訂閱的topic路由信息。

  13. checkClientInBroker: 隨機(jī)選擇一個(gè)Broker, 發(fā)送檢查客戶端tag配置的請求, 主要是檢測Broker是否支持SQL92類型的tag過濾以及SQL92的tag語法是否正確。

  14. sendHeartbeatToAllBrokerWithLock方法, 發(fā)送心跳給所有broker, Broker接收到后, 發(fā)送Code為NOTIFY_CONSUMER_IDS_CHANGED給Group下所有消費(fèi)者, 要求重新負(fù)載均衡。

  15. rebalanceImmediately方法: 喚醒負(fù)載均衡服務(wù)rebalanceService, 主動(dòng)進(jìn)行一次MessageQueue的重平衡。

/**
 * DefaultMQPushConsumerImpl的方法
 * <p>
 * 啟動(dòng)默認(rèn)消費(fèi)者實(shí)現(xiàn)
 */
public synchronized void start() throws MQClientException {
    //根據(jù)服務(wù)狀態(tài)選擇走不同的代碼分支
    switch (this.serviceState) {
        /*
         * 服務(wù)僅僅創(chuàng)建,而不是啟動(dòng)狀態(tài),那么啟動(dòng)服務(wù)
         */
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            //首先修改服務(wù)狀態(tài)為服務(wù)啟動(dòng)失敗,如果最終啟動(dòng)成功則再修改為RUNNING
            this.serviceState = ServiceState.START_FAILED;
            /*
             * 1 檢查消費(fèi)者的配置信息
             *
             * 如果consumerGroup為空,或者長度大于255個(gè)字符,或者包含非法字符(正常的匹配模式為 ^[%|a-zA-Z0-9_-]+$),或者消費(fèi)者組名為默認(rèn)組名DEFAULT_CONSUMER
             * 或者messageModel為空,或者consumeFromWhere為空,或者consumeTimestamp為空,或者allocateMessageQueueStrategy為空……等等屬性的空校驗(yàn)
             * 滿足以上任意條件都校驗(yàn)不通過拋出異常。
             */
            this.checkConfig();
            /*
             * 2 拷貝拷貝訂閱關(guān)系
             *
             * 為集群消費(fèi)模式的消費(fèi)者,配置其對應(yīng)的重試主題 retryTopic = %RETRY% + consumerGroup
             * 并且設(shè)置當(dāng)前消費(fèi)者自動(dòng)訂閱該消費(fèi)者組對應(yīng)的重試topic,用于實(shí)現(xiàn)消費(fèi)重試。
             */
            this.copySubscription();
            //如果是集群消費(fèi)模式,如果instanceName為默認(rèn)值 "DEFAULT",那么改成 UtilAll.getPid() + "#" + System.nanoTime()
            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }
            /*
             * 3 獲取MQClientManager實(shí)例,然后根據(jù)clientId獲取或者創(chuàng)建CreateMQClientInstance實(shí)例,并賦給mQClientFactory變量
             *
             * MQClientInstance封裝了RocketMQ底層網(wǎng)絡(luò)處理API,Producer、Consumer都會使用到這個(gè)類,是Producer、Consumer與NameServer、Broker 打交道的網(wǎng)絡(luò)通道。
             * 因此,同一個(gè)clientId對應(yīng)同一個(gè)MQClientInstance實(shí)例就可以了,即同一個(gè)應(yīng)用中的多個(gè)producer和consumer使用同一個(gè)MQClientInstance實(shí)例即可。
             */
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
            /*
             * 4 設(shè)置負(fù)載均衡服務(wù)的相關(guān)屬性
             */
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            /*
             * 5 創(chuàng)建消息拉取核心對象PullAPIWrapper,封裝了消息拉取及結(jié)果解析邏輯的API
             */
            this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            //為PullAPIWrapper注冊過濾消息的鉤子函數(shù)
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            /*
             * 6 根據(jù)消息模式設(shè)置不同的OffsetStore,用于實(shí)現(xiàn)消費(fèi)者的消息消費(fèi)偏移量offset的管理
             */
            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                //根據(jù)不用的消費(fèi)模式選擇不同的OffsetStore實(shí)現(xiàn)
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        //如果是廣播消費(fèi)模式,則是LocalFileOffsetStore,消息消費(fèi)進(jìn)度即offset存儲在本地磁盤中。
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        //如果是集群消費(fèi)模式,則是RemoteBrokerOffsetStore,消息消費(fèi)進(jìn)度即offset存儲在遠(yuǎn)程broker中。
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            /*
             * 7 加載消費(fèi)偏移量,LocalFileOffsetStore會加載本地磁盤中的數(shù)據(jù),RemoteBrokerOffsetStore則是一個(gè)空實(shí)現(xiàn)。
             */
            this.offsetStore.load();
            /*
             * 8 根據(jù)消息監(jiān)聽器的類型創(chuàng)建不同的消息消費(fèi)服務(wù)
             */
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                //如果是MessageListenerOrderly類型,則表示順序消費(fèi),創(chuàng)建ConsumeMessageOrderlyService
                this.consumeOrderly = true;
                this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                //如果是MessageListenerConcurrently類型,則表示并發(fā)消費(fèi),創(chuàng)建ConsumeMessageOrderlyService
                this.consumeOrderly = false;
                this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
            }
            //啟動(dòng)消息消費(fèi)服務(wù)
            this.consumeMessageService.start();
            /*
             * 9 注冊消費(fèi)者組和消費(fèi)者到MQClientInstance中的consumerTable中
             */
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                //如果沒注冊成功,那么可能是因?yàn)橥粋€(gè)程序中存在同名消費(fèi)者組的不同消費(fèi)者
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
            }
            /*
             * 10 啟動(dòng)CreateMQClientInstance客戶端通信實(shí)例
             * netty服務(wù)、各種定時(shí)任務(wù)、拉取消息服務(wù)、rebalanceService服務(wù)
             */
            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        /**
         * 服務(wù)狀態(tài)是其他的,那么拋出異常,即start方法僅能調(diào)用一次
         */
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
        default:
            break;
    }
    /*
     * 11 后續(xù)處理
     */
    /*
     * 向NameServer拉取并更新當(dāng)前消費(fèi)者訂閱的topic路由信息
     */
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    /*
     * 隨機(jī)選擇一個(gè)Broker,發(fā)送檢查客戶端tag配置的請求,主要是檢測Broker是否支持SQL92類型的tag過濾以及SQL92的tag語法是否正確
     */
    this.mQClientFactory.checkClientInBroker();
    /*
     * 發(fā)送心跳信息給所有broker
     */
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    /*
     * 喚醒負(fù)載均衡服務(wù)rebalanceService,進(jìn)行重平衡
     */
    this.mQClientFactory.rebalanceImmediately();
}
2.1 copySubscription拷貝訂閱關(guān)系

該方法將defaultMQPushConsumer中的訂閱關(guān)系Map集合subscription中的數(shù)據(jù)拷貝到RebalanceImpl的subscriptionInner中。

然后為集群消費(fèi)模式的消費(fèi)者配置重試主題用于實(shí)現(xiàn)消費(fèi)重試。

/**
 * DefaultMQPushConsumerImpl的方法
 * <p>
 * 拷貝訂閱關(guān)系
 *
 * @throws MQClientException
 */
private void copySubscription() throws MQClientException {
    try {
        //將訂閱關(guān)系拷貝到RebalanceImpl的subscriptionInner中
        Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
        if (sub != null) {
            for (final Map.Entry<String, String> entry : sub.entrySet()) {
                final String topic = entry.getKey();
                final String subString = entry.getValue();
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            }
        }
        //如果messageListenerInner為null,那么將defaultMQPushConsumer的messageListener賦給DefaultMQPushConsumerImpl的messageListenerInner
        //在defaultMQPushConsumer的registerMessageListener方法中就已經(jīng)賦值了
        if (null == this.messageListenerInner) {
            this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
        }
        //消息消費(fèi)模式
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            //廣播消費(fèi)模式,消費(fèi)失敗消息會丟棄
            case BROADCASTING:
                break;
            //集群消費(fèi)模式,支持消費(fèi)失敗重試
            //自動(dòng)訂閱該消費(fèi)者組對應(yīng)的重試topic,默認(rèn)就是這個(gè)模式
            case CLUSTERING:
                //獲取當(dāng)前消費(fèi)者對應(yīng)的重試主題 retryTopic = %RETRY% + consumerGroup
                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                //當(dāng)前消費(fèi)者自動(dòng)訂閱該消費(fèi)者組對應(yīng)的重試topic,用于實(shí)現(xiàn)消費(fèi)重試
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                break;
            default:
                break;
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}

3.總結(jié)

Consumer消費(fèi)者啟動(dòng)的主要流程:文章來源地址http://www.zghlxwxcb.cn/news/detail-579332.html

  1. rebalanceService: 消費(fèi)者負(fù)載均衡服務(wù), 用于確定消費(fèi)者的消息隊(duì)列以及負(fù)載均衡, 同時(shí)觸發(fā)pullMessageService拉取消息的入口。MQClientInstance啟動(dòng), 同一個(gè)服務(wù)器的所有Consumer使用同一個(gè)實(shí)例。
  2. pullMessageService: 拉取消息服務(wù), 由MQClientInstance啟動(dòng), 同一個(gè)服務(wù)器的所有Consumer使用同一個(gè)實(shí)例。
  3. consumeMessageService: 消費(fèi)者消費(fèi)消息服務(wù), 消息拉取到后, 交給這個(gè)服務(wù)。DefaultMQPushConsumerImpl啟動(dòng), 每個(gè)Consumer持有一個(gè)實(shí)例。
  4. OffsetStore: 管理消費(fèi)點(diǎn)位的上報(bào)持久化, DefaultMQPushConsumerImpl啟動(dòng), 每個(gè)Consumer持有一個(gè)實(shí)例。

到了這里,關(guān)于[RocketMQ] Consumer消費(fèi)者啟動(dòng)主要流程源碼 (六)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka-消費(fèi)者-Consumer Group Rebalance設(shè)計(jì)

    Kafka-消費(fèi)者-Consumer Group Rebalance設(shè)計(jì)

    在同一個(gè)Consumer Group中,同一個(gè)Topic的不同分區(qū)會分配給不同的消費(fèi)者進(jìn)行消費(fèi),那么為消費(fèi)者分配分區(qū)的操作是在Kafka服務(wù)端完成的嗎?分區(qū)是如何進(jìn)行分配呢?下面來分析Rebalance操作的原理。 Kafka最開始的解決方案是通過ZooKeeper的Watcher實(shí)現(xiàn)的。 每個(gè)Consumer Group在ZooKeeper下都維

    2024年01月19日
    瀏覽(29)
  • kafka-consumer-groups.sh消費(fèi)者組管理

    kafka-consumer-groups.sh消費(fèi)者組管理

    ??先調(diào)用 MetadataRequest 拿到所有在線Broker列表 再給每個(gè)Broker發(fā)送 ListGroupsRequest 請求獲取 消費(fèi)者組數(shù)據(jù)。 查看指定消費(fèi)組詳情 --group 查看所有消費(fèi)組詳情 --all-groups 查詢消費(fèi)者成員信息 --members 查詢消費(fèi)者狀態(tài)信息 --state 刪除指定消費(fèi)組 --group 刪除所有消費(fèi)組 --all-groups 想要

    2024年02月03日
    瀏覽(39)
  • Kafka 架構(gòu)深度解析:生產(chǎn)者(Producer)和消費(fèi)者(Consumer)

    Kafka 架構(gòu)深度解析:生產(chǎn)者(Producer)和消費(fèi)者(Consumer)

    Apache Kafka 作為分布式流處理平臺,其架構(gòu)中的生產(chǎn)者和消費(fèi)者是核心組件,負(fù)責(zé)實(shí)現(xiàn)高效的消息生產(chǎn)和消費(fèi)。本文將深入剖析 Kafka 架構(gòu)中生產(chǎn)者和消費(fèi)者的工作原理、核心概念以及高級功能。 1 發(fā)送消息到 Kafka Kafka 生產(chǎn)者負(fù)責(zé)將消息發(fā)布到指定的主題。以下是一個(gè)簡單的生

    2024年02月03日
    瀏覽(50)
  • kafka Consumer 消費(fèi)者使用多線程并發(fā)執(zhí)行,并保證順序消費(fèi), 第一種使用純線程方式、第二種使用Executors線程池

    kafka Consumer 消費(fèi)者使用多線程并發(fā)執(zhí)行,并保證順序消費(fèi), 第一種使用純線程方式、第二種使用Executors線程池

    網(wǎng)上搜索kafka消費(fèi)者通過多線程進(jìn)行順序消費(fèi)的內(nèi)容都不太理想,或者太過復(fù)雜,所以自己寫了幾個(gè)demo,供大家參考指正。 ????????單個(gè)消費(fèi)者,每秒需要處理1000條數(shù)據(jù),每條數(shù)據(jù)的處理時(shí)間為500ms,相同accNum(客戶賬號)的數(shù)據(jù)需要保證消費(fèi)的順序。 1、如果1秒鐘生產(chǎn)

    2024年02月15日
    瀏覽(22)
  • RocketMQ (九) 消費(fèi)者分組-ConsumerGroup

    RocketMQ (九) 消費(fèi)者分組-ConsumerGroup

    消費(fèi)者分組是 Apache RocketMQ 系統(tǒng)中承載多個(gè)消費(fèi)行為一致的消費(fèi)者的負(fù)載均衡分組。 和消費(fèi)者不同,消費(fèi)者分組并不是運(yùn)行實(shí)體,而是一個(gè)邏輯資源。在 Apache RocketMQ 中,通過消費(fèi)者分組內(nèi)初始化多個(gè)消費(fèi)者實(shí)現(xiàn)消費(fèi)性能的水平擴(kuò)展以及高可用容災(zāi)。 在消費(fèi)者分組中,統(tǒng)一定

    2024年02月16日
    瀏覽(19)
  • 29.RocketMQ之消費(fèi)者負(fù)載均衡策略

    消費(fèi)者在消費(fèi)消息的時(shí)候,需要知道從Broker的哪一個(gè)消息隊(duì)列中去獲取消息。所以,在消費(fèi)者端必須要做負(fù)載均衡,即Broker端中多個(gè)消費(fèi)隊(duì)列分配給同一個(gè)消費(fèi)者組中的哪些消費(fèi)者消費(fèi)。 上篇文章講解了負(fù)載均衡過程,這篇文章講解消費(fèi)者負(fù)載均衡的策略。 AllocateMessageQueu

    2024年02月12日
    瀏覽(21)
  • RocketMQ教程-(5)-功能特性-消費(fèi)者分類

    RocketMQ教程-(5)-功能特性-消費(fèi)者分類

    Apache RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 這三種類型的消費(fèi)者,本文分別從使用方式、實(shí)現(xiàn)原理、可靠性重試和適用場景等方面為您介紹這三種類型的消費(fèi)者。 Apache RocketMQ 面向不同的業(yè)務(wù)場景提供了不同消費(fèi)者類型,每種消費(fèi)者類型的集成方式和控制方式都

    2024年02月16日
    瀏覽(19)
  • python rocketmq生產(chǎn)者消費(fèi)者

    安裝依賴包 生產(chǎn)者 需要注意的是假如你用的java SDK 需要只是UNinname 我們可以看到下列代碼設(shè)置了tag以及key,在頁面可以根據(jù)key查找消息 消費(fèi)方式PullConsumer(全部消費(fèi))(可重復(fù)消費(fèi)) 消費(fèi)方式PushConsumer(即時(shí)消費(fèi))(不可重復(fù)消費(fèi)) 生產(chǎn)者發(fā)送消息選擇隊(duì)列,以及設(shè)置順

    2024年02月14日
    瀏覽(18)
  • RocketMQ消費(fèi)者可以手動(dòng)消費(fèi)但無法主動(dòng)消費(fèi)問題,或生成者發(fā)送超時(shí)

    RocketMQ消費(fèi)者可以手動(dòng)消費(fèi)但無法主動(dòng)消費(fèi)問題,或生成者發(fā)送超時(shí)

    修改rocketmq文件夾broker.conf 在RocketMQ獨(dú)享實(shí)例中支持IPv4和IPv6雙棧,主要是通過在網(wǎng)絡(luò)層面上同時(shí)支持IPv4和IPv6協(xié)議棧來實(shí)現(xiàn)的。RocketMQ的Broker端、Namesrv端和客戶端都需要支持IPv4和IPv6協(xié)議,以便能夠同時(shí)監(jiān)聽IPv4和IPv6地址,并使用相應(yīng)的協(xié)議棧進(jìn)行通信。在Broker端,我們需要在

    2024年02月13日
    瀏覽(19)
  • RocketMQ 的消費(fèi)者類型詳解與最佳實(shí)踐

    RocketMQ 的消費(fèi)者類型詳解與最佳實(shí)踐

    作者:凌楚 在 RocketMQ 5.0 中,更加強(qiáng)調(diào)了客戶端類型的概念,尤其是消費(fèi)者類型。為了滿足多樣的 RocketMQ 中一共有三種不同的消費(fèi)者類型,分別是 PushConsumer、SimpleConsumer 和 PullConsumer。不同的消費(fèi)者類型對應(yīng)著不同的業(yè)務(wù)場景。 本篇文章也會根據(jù)不同的消費(fèi)者類型來進(jìn)行講述

    2024年02月02日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包