客戶端常用的消費(fèi)者類是DefaultMQPushConsumer,
DefaultMQPushConsumer的構(gòu)造器以及start方法的源碼。
1.創(chuàng)建DefaultMQPushConsumer實(shí)例
最終都是調(diào)用下面四個(gè)參數(shù)的構(gòu)造函數(shù):
/**
* 創(chuàng)建DefaultMQPushConsumer實(shí)例
*
* @param namespace namespace地址
* @param consumerGroup 消費(fèi)者組
* @param rpcHook 在每個(gè)遠(yuǎn)程處理命令之前執(zhí)行的RPC鉤子
* @param allocateMessageQueueStrategy 消費(fèi)者之間消息分配的策略算法
*/
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
//創(chuàng)建DefaultMQPushConsumerImpl實(shí)例
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
指定了命名空間、生產(chǎn)者組、RPC鉤子和消費(fèi)者之間消息分配的策略算法的構(gòu)造器, 創(chuàng)建了一個(gè)DefaultMQPushConsumerImpl實(shí)例, DefaultMQPushConsumer可以看作是DefaultMQPushConsumerImpl的包裝類, 給開發(fā)人員用的。DefaultMQPushConsumer可以看作為DefaultMQPushConsumerImpl的門面。
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook
rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
// consumer 狀態(tài)錯(cuò)誤時(shí)采用定時(shí)任務(wù)定時(shí)執(zhí)行拉取請求的時(shí)間間隔
this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();
}
創(chuàng)建了DefaultMQPushConsumer實(shí)例之后, 設(shè)置一些屬性, 比如namesrvAddr、consumeFromWhere、注冊messageListener消息監(jiān)聽器等等。這些都是簡單的屬性賦值操作, 除了subscribe方法。
1.1 subscribe訂閱
subscribe方法表示Consumer訂閱的自己感興趣的Topic, 并且支持對消息進(jìn)行過濾, 過濾表達(dá)式支持TAG和SQL92兩種類型, 他們都會被解析為SubscriptionData對象, 最終將topic與SubscriptionData的關(guān)系維護(hù)到RebalanceImpl內(nèi)部的subscriptionInner這個(gè)map集合中。
/**
* DefaultMQPushConsumer的方法
* <p>
* 訂閱topic,支持消息過濾表達(dá)式
*
* @param topic 訂閱的topic
* @param subExpression 訂閱表達(dá)式。它僅支持或操作,如“tag1 | | tag2 | | tag3”,如果為 null 或 *,則表示訂閱全部
*/
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
//調(diào)用defaultMQPushConsumerImpl的subscribe方法
this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression);
}
/**
* DefaultMQPushConsumerImpl的方法
* <p>
* 訂閱topic
*/
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
//解析訂閱表達(dá)式,構(gòu)建SubscriptionData
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
//將topic與SubscriptionData的關(guān)系維護(hù)到RebalanceImpl內(nèi)部的subscriptionInner這個(gè)map集合中
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
//如果mQClientFactory不為null,則發(fā)送心跳信息給所有broker。
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
2.start啟動(dòng)消費(fèi)者
start方法內(nèi)部會執(zhí)行很多初始化操作。
/**
* DefaultMQPushConsumer的方法
* <p>
* 啟動(dòng)消費(fèi)者
*/
@Override
public void start() throws MQClientException {
//根據(jù)namespace和consumerGroup設(shè)置消費(fèi)者組
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
//默認(rèn)消費(fèi)者實(shí)現(xiàn)啟動(dòng)
this.defaultMQPushConsumerImpl.start();
//消息軌跡跟蹤服務(wù),默認(rèn)null
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
defaultMQPushConsumerImpl#start方法。
-
checkConfig方法檢查消費(fèi)者的配置信息: 如果consumerGroup為空, 或者長度大于255個(gè)字符, 或者包含非法字符, 或者消費(fèi)者組名為默認(rèn)組名DEFAULT_CONSUMER, 或者messageModel為空, 或者consumeFromWhere為空, 或者consumeTimestamp為空, 或者allocateMessageQueueStrategy為空, 滿足以上任意條件都校驗(yàn)不通過拋出異常。
-
調(diào)用copySubscription方法, 拷貝訂閱關(guān)系, 然后為集群消費(fèi)模式的消費(fèi)者配置重試主題用于實(shí)現(xiàn)消費(fèi)重試。
-
調(diào)用getOrCreateMQClientInstance方法, 根據(jù)clientId獲取或者創(chuàng)建CreateMQClientInstance實(shí)例, 并賦給mQClientFactory變量。
-
設(shè)置負(fù)載均衡服務(wù)rebalanceImpl的相關(guān)屬性。
-
創(chuàng)建消息拉取核心對象PullAPIWrapper, 封裝了消息拉取以及結(jié)果解析邏輯的api。
-
根據(jù)消息模式設(shè)置不同的OffsetStore, 用于實(shí)現(xiàn)消費(fèi)者的消息消費(fèi)偏移量offset的管理。如果是廣播消費(fèi)模式, 是LocalFileOffsetStore, offset存儲在本地磁盤, 如果是集群模式, 是RemoteBrokerOffsetStore, offset存儲在遠(yuǎn)程broker中。
-
調(diào)用offsetStore.load加載消費(fèi)偏移量, LocalFileOffsetStore會加載本地磁盤中的數(shù)據(jù)。
-
根據(jù)消息監(jiān)聽器MessageListener的類型創(chuàng)建不同的消息消費(fèi)服務(wù)ConsumeMessageService, MessageListenerOrderly類型表示順序消費(fèi), 創(chuàng)建ConsumeMessageOrderlyService, MessageListenerConcurrently類型表示并發(fā)消費(fèi), 創(chuàng)建ConsumeMessageConcurrentlyService。
-
調(diào)用consumeMessageService.start啟動(dòng)消息消費(fèi)服務(wù), 消息拉取服務(wù)PullMessageService拉取到消息后, 會構(gòu)建ConsumeRequest對象交給consumeMessageService去消費(fèi)。
-
注冊消費(fèi)者組和消費(fèi)者到MQClientInstance中的consumerTable中, 如果沒有注冊成功, 可能是因?yàn)橥粋€(gè)程序中存在同名消費(fèi)者組的不同消費(fèi)者, 拋出異常。
-
mQClientFactory#start啟動(dòng)CreateMQClientInstance, 初始化netty服務(wù)、各種定時(shí)任務(wù)、拉取消息服務(wù)、rebalanceService服務(wù)等等。CreateMQClientInstance僅會被初始化一次
-
updateTopicSubscribeInfoWhenSubscriptionChanged方法: 向NameServer拉取并更新當(dāng)前消費(fèi)者訂閱的topic路由信息。
-
checkClientInBroker: 隨機(jī)選擇一個(gè)Broker, 發(fā)送檢查客戶端tag配置的請求, 主要是檢測Broker是否支持SQL92類型的tag過濾以及SQL92的tag語法是否正確。
-
sendHeartbeatToAllBrokerWithLock方法, 發(fā)送心跳給所有broker, Broker接收到后, 發(fā)送Code為NOTIFY_CONSUMER_IDS_CHANGED給Group下所有消費(fèi)者, 要求重新負(fù)載均衡。
-
rebalanceImmediately方法: 喚醒負(fù)載均衡服務(wù)rebalanceService, 主動(dòng)進(jìn)行一次MessageQueue的重平衡。
/**
* DefaultMQPushConsumerImpl的方法
* <p>
* 啟動(dòng)默認(rèn)消費(fèi)者實(shí)現(xiàn)
*/
public synchronized void start() throws MQClientException {
//根據(jù)服務(wù)狀態(tài)選擇走不同的代碼分支
switch (this.serviceState) {
/*
* 服務(wù)僅僅創(chuàng)建,而不是啟動(dòng)狀態(tài),那么啟動(dòng)服務(wù)
*/
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
//首先修改服務(wù)狀態(tài)為服務(wù)啟動(dòng)失敗,如果最終啟動(dòng)成功則再修改為RUNNING
this.serviceState = ServiceState.START_FAILED;
/*
* 1 檢查消費(fèi)者的配置信息
*
* 如果consumerGroup為空,或者長度大于255個(gè)字符,或者包含非法字符(正常的匹配模式為 ^[%|a-zA-Z0-9_-]+$),或者消費(fèi)者組名為默認(rèn)組名DEFAULT_CONSUMER
* 或者messageModel為空,或者consumeFromWhere為空,或者consumeTimestamp為空,或者allocateMessageQueueStrategy為空……等等屬性的空校驗(yàn)
* 滿足以上任意條件都校驗(yàn)不通過拋出異常。
*/
this.checkConfig();
/*
* 2 拷貝拷貝訂閱關(guān)系
*
* 為集群消費(fèi)模式的消費(fèi)者,配置其對應(yīng)的重試主題 retryTopic = %RETRY% + consumerGroup
* 并且設(shè)置當(dāng)前消費(fèi)者自動(dòng)訂閱該消費(fèi)者組對應(yīng)的重試topic,用于實(shí)現(xiàn)消費(fèi)重試。
*/
this.copySubscription();
//如果是集群消費(fèi)模式,如果instanceName為默認(rèn)值 "DEFAULT",那么改成 UtilAll.getPid() + "#" + System.nanoTime()
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
/*
* 3 獲取MQClientManager實(shí)例,然后根據(jù)clientId獲取或者創(chuàng)建CreateMQClientInstance實(shí)例,并賦給mQClientFactory變量
*
* MQClientInstance封裝了RocketMQ底層網(wǎng)絡(luò)處理API,Producer、Consumer都會使用到這個(gè)類,是Producer、Consumer與NameServer、Broker 打交道的網(wǎng)絡(luò)通道。
* 因此,同一個(gè)clientId對應(yīng)同一個(gè)MQClientInstance實(shí)例就可以了,即同一個(gè)應(yīng)用中的多個(gè)producer和consumer使用同一個(gè)MQClientInstance實(shí)例即可。
*/
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
/*
* 4 設(shè)置負(fù)載均衡服務(wù)的相關(guān)屬性
*/
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
/*
* 5 創(chuàng)建消息拉取核心對象PullAPIWrapper,封裝了消息拉取及結(jié)果解析邏輯的API
*/
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
//為PullAPIWrapper注冊過濾消息的鉤子函數(shù)
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
/*
* 6 根據(jù)消息模式設(shè)置不同的OffsetStore,用于實(shí)現(xiàn)消費(fèi)者的消息消費(fèi)偏移量offset的管理
*/
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
//根據(jù)不用的消費(fèi)模式選擇不同的OffsetStore實(shí)現(xiàn)
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
//如果是廣播消費(fèi)模式,則是LocalFileOffsetStore,消息消費(fèi)進(jìn)度即offset存儲在本地磁盤中。
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
//如果是集群消費(fèi)模式,則是RemoteBrokerOffsetStore,消息消費(fèi)進(jìn)度即offset存儲在遠(yuǎn)程broker中。
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
/*
* 7 加載消費(fèi)偏移量,LocalFileOffsetStore會加載本地磁盤中的數(shù)據(jù),RemoteBrokerOffsetStore則是一個(gè)空實(shí)現(xiàn)。
*/
this.offsetStore.load();
/*
* 8 根據(jù)消息監(jiān)聽器的類型創(chuàng)建不同的消息消費(fèi)服務(wù)
*/
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
//如果是MessageListenerOrderly類型,則表示順序消費(fèi),創(chuàng)建ConsumeMessageOrderlyService
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
//如果是MessageListenerConcurrently類型,則表示并發(fā)消費(fèi),創(chuàng)建ConsumeMessageOrderlyService
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
//啟動(dòng)消息消費(fèi)服務(wù)
this.consumeMessageService.start();
/*
* 9 注冊消費(fèi)者組和消費(fèi)者到MQClientInstance中的consumerTable中
*/
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
//如果沒注冊成功,那么可能是因?yàn)橥粋€(gè)程序中存在同名消費(fèi)者組的不同消費(fèi)者
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
/*
* 10 啟動(dòng)CreateMQClientInstance客戶端通信實(shí)例
* netty服務(wù)、各種定時(shí)任務(wù)、拉取消息服務(wù)、rebalanceService服務(wù)
*/
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
/**
* 服務(wù)狀態(tài)是其他的,那么拋出異常,即start方法僅能調(diào)用一次
*/
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
/*
* 11 后續(xù)處理
*/
/*
* 向NameServer拉取并更新當(dāng)前消費(fèi)者訂閱的topic路由信息
*/
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
/*
* 隨機(jī)選擇一個(gè)Broker,發(fā)送檢查客戶端tag配置的請求,主要是檢測Broker是否支持SQL92類型的tag過濾以及SQL92的tag語法是否正確
*/
this.mQClientFactory.checkClientInBroker();
/*
* 發(fā)送心跳信息給所有broker
*/
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
/*
* 喚醒負(fù)載均衡服務(wù)rebalanceService,進(jìn)行重平衡
*/
this.mQClientFactory.rebalanceImmediately();
}
2.1 copySubscription拷貝訂閱關(guān)系
該方法將defaultMQPushConsumer中的訂閱關(guān)系Map集合subscription中的數(shù)據(jù)拷貝到RebalanceImpl的subscriptionInner中。
然后為集群消費(fèi)模式的消費(fèi)者配置重試主題用于實(shí)現(xiàn)消費(fèi)重試。文章來源:http://www.zghlxwxcb.cn/news/detail-579332.html
/**
* DefaultMQPushConsumerImpl的方法
* <p>
* 拷貝訂閱關(guān)系
*
* @throws MQClientException
*/
private void copySubscription() throws MQClientException {
try {
//將訂閱關(guān)系拷貝到RebalanceImpl的subscriptionInner中
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
//如果messageListenerInner為null,那么將defaultMQPushConsumer的messageListener賦給DefaultMQPushConsumerImpl的messageListenerInner
//在defaultMQPushConsumer的registerMessageListener方法中就已經(jīng)賦值了
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
//消息消費(fèi)模式
switch (this.defaultMQPushConsumer.getMessageModel()) {
//廣播消費(fèi)模式,消費(fèi)失敗消息會丟棄
case BROADCASTING:
break;
//集群消費(fèi)模式,支持消費(fèi)失敗重試
//自動(dòng)訂閱該消費(fèi)者組對應(yīng)的重試topic,默認(rèn)就是這個(gè)模式
case CLUSTERING:
//獲取當(dāng)前消費(fèi)者對應(yīng)的重試主題 retryTopic = %RETRY% + consumerGroup
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
//當(dāng)前消費(fèi)者自動(dòng)訂閱該消費(fèi)者組對應(yīng)的重試topic,用于實(shí)現(xiàn)消費(fèi)重試
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
3.總結(jié)
Consumer消費(fèi)者啟動(dòng)的主要流程:文章來源地址http://www.zghlxwxcb.cn/news/detail-579332.html
- rebalanceService: 消費(fèi)者負(fù)載均衡服務(wù), 用于確定消費(fèi)者的消息隊(duì)列以及負(fù)載均衡, 同時(shí)觸發(fā)pullMessageService拉取消息的入口。MQClientInstance啟動(dòng), 同一個(gè)服務(wù)器的所有Consumer使用同一個(gè)實(shí)例。
- pullMessageService: 拉取消息服務(wù), 由MQClientInstance啟動(dòng), 同一個(gè)服務(wù)器的所有Consumer使用同一個(gè)實(shí)例。
- consumeMessageService: 消費(fèi)者消費(fèi)消息服務(wù), 消息拉取到后, 交給這個(gè)服務(wù)。DefaultMQPushConsumerImpl啟動(dòng), 每個(gè)Consumer持有一個(gè)實(shí)例。
- OffsetStore: 管理消費(fèi)點(diǎn)位的上報(bào)持久化, DefaultMQPushConsumerImpl啟動(dòng), 每個(gè)Consumer持有一個(gè)實(shí)例。
到了這里,關(guān)于[RocketMQ] Consumer消費(fèi)者啟動(dòng)主要流程源碼 (六)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!