一、前言
更多RocketMQ內(nèi)容,見專欄:https://blog.csdn.net/saintmm/category_11280399.html
二、消息軌跡
消息軌跡簡單來說就是日志,其把消息的生產(chǎn)、存儲、消費等所有的訪問和操作日志。
1、消息軌跡的引入目的
在項目中存在發(fā)送方與消費方相互“扯皮”的情況:
- 發(fā)送方說消息已經(jīng)發(fā)送成功,而消費方說沒有消費到。
- 這時我們就希望能記錄一條消息的流轉(zhuǎn)軌跡,即:消息是由哪個IP發(fā)送的?什么時候發(fā)送的?是被哪個消費者消費的?
2、如何使用消息軌跡
1> 修改Broker服務(wù)端配置,設(shè)置 traceTopicEnable=true
;
-
表示在Broker上創(chuàng)建名為
RMQ_SYS_TRACE_TOPIC
的topic,隊列個數(shù)為1。所有的msgTrace信息默認都存儲在這個topic中。
2> Producer中開啟消息軌跡;
-
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
- boolean類型的入?yún)?code>enableMsgTrace設(shè)置為true,表示啟用消息軌跡追蹤,默認為false。
-
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
- String類型的入?yún)?code>customizedTraceTopic,表示用于記錄消息軌跡的topic,不設(shè)置默認為
RMQ_SYS_TRACE_TOPIC
。
- String類型的入?yún)?code>customizedTraceTopic,表示用于記錄消息軌跡的topic,不設(shè)置默認為
3> Consuemr中開啟消息軌跡;
-
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace)
- boolean類型的入?yún)?code>enableMsgTrace設(shè)置為true,表示啟用消息軌跡追蹤,默認為false。
如果啟用了消息軌跡,在消息發(fā)送時盡量為消息指定Key屬性,以便在RocketMQ-Console中對消息進行高性能的查詢。
1)使用案例
1> broker的配置文件(broker.conf)中增加如下配置,然后重啟Broker:
traceTopicEnable = true
2> Producer:
public class TraceProducer {
public static void main(String[] args) throws Exception {
// 第二個參數(shù)TRUE,表示開啟MsgTrace
DefaultMQProducer producer = new DefaultMQProducer("saint-test", true);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setMaxMessageSize(1024 * 1024 * 10);
producer.start();
Message msg = new Message("test-topic-trace",null, "key-trace", "trace-2".getBytes(StandardCharsets.UTF_8));
SendResult send = producer.send(msg);
System.out.println("sendResult: " + send);
// 關(guān)閉生產(chǎn)者
producer.shutdown();
System.out.println("已經(jīng)停機");
}
}
3> Consumer:
public class TraceConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("study-consumer", true);
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("test-topic-trace", "*");
consumer.setConsumeTimeout(20L);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("Consumer start。。。。。。");
}
}
2)消息軌跡內(nèi)容
消息軌跡內(nèi)容包括:消息ID(MessageID)、消息Tag、消息Key(MessageKey)、消息的存儲時間、處理消息的客戶端IP、存儲服務(wù)器IP、發(fā)送/消費耗時、消息軌跡狀態(tài)、跟蹤類型。
在RocketMQ-Console中的消息軌跡內(nèi)容如下:
3) RocketMQ-Console中查看消息軌跡
在MessageTrace大分類下有兩種方式可以查看消息軌跡,一種是根據(jù) 原消息Topic + MessageKey、另一種是根據(jù) 原消息Topic + MessageID;
所以建議如果啟用了消息軌跡,在消息發(fā)送時盡量為消息指定Key屬性,以便在RocketMQ-Console中對消息進行高性能的查詢。
1> 根據(jù)Message Key查詢:
2> 根據(jù)Message ID查詢:
3、消息軌跡實現(xiàn)原理
1)消息軌跡數(shù)據(jù)結(jié)構(gòu)
1> 消息軌跡主體內(nèi)容采用TraceContext
類存儲;
public class TraceContext implements Comparable<TraceContext> {
private TraceType traceType;
private long timeStamp = System.currentTimeMillis();
private String regionId = "";
private String regionName = "";
private String groupName = "";
private int costTime = 0;
private boolean isSuccess = true;
private String requestId = MessageClientIDSetter.createUniqID();
private int contextCode = 0;
private List<TraceBean> traceBeans;
traceType:跟蹤類型,可選值為Pub(消息發(fā)送)、SubBefore(消息拉取到客戶端,在執(zhí)行業(yè)務(wù)定義的消費邏輯之前)、SubAfter(消費后)。
timeStamp:當(dāng)前時間戳。
regionId:Broker所在的區(qū)域ID,取自BrokerConfig#regionId()。
groupName:組名稱,traceType為Pub時表示生產(chǎn)者組的名稱,traceType為subBefore或subAfter時表示消費組名稱。
requestId:在traceType為subBefore、subAfter時使用,消
費端的請求ID。
contextCode:消費狀態(tài)碼,可選值為SUCCESS、TIME_OUT、EXCEPTION、RETURNNULL、FAILED。
2> 針對消息信息采用TraceBean
類維護;
public class TraceBean {
private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
private String topic = "";
private String msgId = "";
private String offsetMsgId = "";
private String tags = "";
private String keys = "";
private String storeHost = LOCAL_ADDRESS;
private String clientHost = LOCAL_ADDRESS;
private long storeTime;
private int retryTimes;
private int bodyLength;
private MessageType msgType;
}
- topic:消息主題
- msgId:消息唯一ID
- offsetMsgId:消息偏移量ID,該ID中包含了Broker的IP以及偏移量
- tags:消息標(biāo)志
- keys:消息索引key,根據(jù)該key可快速檢索消息
- storeHost:跟蹤類型為Pub時存儲該消息的Broker服務(wù)器IP,跟蹤類型為subBefore、subAfter時存儲消費者IP
- bodyLength:消息體的長度
- msgType:消息的類型,可選值為Normal_Msg(普通消息)、Trans_Msg_Half(預(yù)提交消息)、Trans_msg_Commit(提交消息)、Delay_Msg(延遲消息)
2)軌跡消息存儲
RocketMQ選擇將消息軌跡數(shù)據(jù)當(dāng)作一條消息,存儲在Broker服務(wù)器中。
RocketMQ提供了兩種方式來定義消息軌跡存儲的topic:
- 系統(tǒng)默認topic:如果Broker的traceTopicEnable配置項設(shè)為true,表示在該Broker上創(chuàng)建名為RMQ_SYS_TRACE_TOPIC的topic,隊列個數(shù)為1,默認該值為false。
- 自定義topic:在創(chuàng)建消息生產(chǎn)者或消息消費者時,可以通過參數(shù)自定義用于記錄消息軌跡的topic名稱;
- 注意:RokcetMQ控制臺(rocketmq-console)中只支持配置一個消息軌跡topic,建議使用系統(tǒng)默認的topic。
另外:為了避免消息軌跡的數(shù)據(jù)與正常的業(yè)務(wù)數(shù)據(jù)混在一起,官方建議單獨使用一個Broker用于開啟消息軌跡跟蹤。消息軌跡數(shù)據(jù)只會發(fā)送到這一臺Broker服務(wù)器上,不影響原業(yè)務(wù)Broker的負載壓力。
4、如何采集消息軌跡數(shù)據(jù)
MQ的核心操作是消息發(fā)送和消息存儲,數(shù)據(jù)載體為消息。消息軌跡主要是記錄消息何時發(fā)送到哪臺Broker?發(fā)送耗時是多少?在什么時候被哪個消費者消費?消費耗時是多少?…
1)消息發(fā)送
在消息發(fā)送前后RocketMQ會將本次調(diào)用的信息進行采集。RocketMQ通過提供消息發(fā)送鉤子函數(shù)(SendMessageHook)實現(xiàn),并且為了不明顯增加消息發(fā)送的時間延遲,使用異步的方式記錄消息軌跡。
1> 實例化Producer
在案例中我們知道在實例化DefaultMQProducer
時,需要將入?yún)nableMsgTrace設(shè)置為true,才能開啟消息軌跡。當(dāng)enableMsgTrace為true時,看DefaultMQProducer的構(gòu)造函數(shù):
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message trace feature
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
其中:
- SendMessageTraceHookImpl:消息發(fā)送的鉤子函數(shù),用于跟蹤消息軌跡;
- 對應(yīng)的消息消費鉤子函數(shù)為:ConsumeMessageTraceHookImpl
- AsyncTraceDispatcher:消息軌跡異步轉(zhuǎn)發(fā)器。用于在消息生產(chǎn)時,異步將消息軌跡保存到特定topic(默認為
RMQ_SYS_TRACE_TOPIC
)
2> Producer發(fā)送消息
在DefaultMqProducerImple#sendKernelImpl方法中,會首先判斷是否有發(fā)送消息鉤子函數(shù)(SendMessageHook
);如果有:
- 在發(fā)送消息之前調(diào)用鉤子函數(shù)SendMessageHook#sendMessageBefore()方法,將消息軌跡數(shù)據(jù)先存儲在調(diào)用上下文中。
- 在發(fā)送消息之后調(diào)用鉤子函數(shù)SendMessageHook#sendMessageAfter()方法,使用AsyncTraceDispatcher異步將消息軌跡數(shù)據(jù)發(fā)送到消息服務(wù)器(Broker)上。
無論是DefaultMQProducerImpl中的executeSendMessageHookBefore()方法還是executeSendMessageHookAfter()方法,內(nèi)部都是調(diào)用所有SendMessageHook的相應(yīng)before()、after()方法。
public void executeSendMessageHookBefore(final SendMessageContext context) {
if (!this.sendMessageHookList.isEmpty()) {
for (SendMessageHook hook : this.sendMessageHookList) {
try {
hook.sendMessageBefore(context);
} catch (Throwable e) {
log.warn("failed to executeSendMessageHookBefore", e);
}
}
}
}
public void executeSendMessageHookAfter(final SendMessageContext context) {
if (!this.sendMessageHookList.isEmpty()) {
for (SendMessageHook hook : this.sendMessageHookList) {
try {
hook.sendMessageAfter(context);
} catch (Throwable e) {
log.warn("failed to executeSendMessageHookAfter", e);
}
}
}
}
<1> sendMessageBefore()
@Override
public void sendMessageBefore(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
return;
}
//build the context content of TuxeTraceContext
TraceContext tuxeContext = new TraceContext();
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
tuxeContext.getTraceBeans().add(traceBean);
}
方法邏輯很簡單:只是在消息發(fā)送之前先收集消息的topic、tag、key、存儲Broker的IP地址、消息體的長度等基礎(chǔ)信息,并將消息軌跡數(shù)據(jù)先存儲在調(diào)用上下文中。
<2> sendMessageAfter()
@Override
public void sendMessageAfter(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
|| context.getMqTraceContext() == null) {
return;
}
if (context.getSendResult() == null) {
return;
}
if (context.getSendResult().getRegionId() == null
|| !context.getSendResult().isTraceOn()) {
// if switch is false,skip it
return;
}
TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
tuxeContext.setCostTime(costTime);
if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
tuxeContext.setSuccess(true);
} else {
tuxeContext.setSuccess(false);
}
tuxeContext.setRegionId(context.getSendResult().getRegionId());
traceBean.setMsgId(context.getSendResult().getMsgId());
traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
localDispatcher.append(tuxeContext);
}
消息發(fā)送到Broker返回之后會調(diào)用到sendMessageAfter()方法。
進行一些校驗:
- 如果消息發(fā)送上下為空 或 發(fā)送消息結(jié)果為空,則直接返回,不記錄消息軌跡數(shù)據(jù)。
進一步維護消息軌跡數(shù)據(jù);
從MqTraceContext中獲取跟蹤的TraceBean,雖然TraceContext中將TraceBean設(shè)計成List結(jié)構(gòu),但在消息發(fā)送時,這里的數(shù)據(jù)永遠只有一條,即使是批量發(fā)送。
設(shè)置costTime(消息發(fā)送耗時)、success(是否發(fā)送成功)、regionId(發(fā)送到Broker所在的分區(qū))、msgId(消息ID,全局唯一)、offsetMsgId(消息物理偏移量,如果是批量消息,則是最后一條消息的物理偏移量)、storeTime。
注意:storeTime并不是真實的消息存儲時間,而是一個估算值,取自:客戶端發(fā)送消息耗時的一半。
traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
關(guān)鍵點:消息軌跡的異步發(fā)送;
將消息軌跡添加到AsyncTraceDispatcher中的阻塞隊列
traceContextQueue
中,以供后續(xù)異步發(fā)送消息軌跡使用;
在實例化Producer時,如果開啟了消息軌跡,會實例化
AsyncTraceDispatcher
;并且在啟動Producer時也會啟動AsyncTraceDispatcher
;
- 最終目的是從阻塞隊列
traceContextQueue
中找到待異步發(fā)送的軌跡消息,然后發(fā)送到相應(yīng)Broker中。
<3> 消息軌跡異步發(fā)送
進一步看AsyncTraceDispatcher#start()方法:
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
其中啟動一個后臺線程,線程的具體邏輯體現(xiàn)在AsyncRunnable
中。
class AsyncRunnable implements Runnable {
private boolean stopped;
@Override
public void run() {
while (!stopped) {
List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
for (int i = 0; i < batchSize; i++) {
TraceContext context = null;
try {
//get trace data element from blocking Queue — traceContextQueue
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
}
}
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecutor.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
}
}
- 從待發(fā)送隊列
traceContextQueue
中不斷獲取消息軌跡的數(shù)據(jù),并將其異步發(fā)送到消息服務(wù)器。 - 為了提高消息的發(fā)送效率引入批量機制,即:一次從隊列中獲取一批消息
- 然后封裝成AsyncAppenderRequest任務(wù)并提交到線程池中異步執(zhí)行;
- 真正的發(fā)送消息軌跡數(shù)據(jù)的邏輯被封裝在AsyncTraceDispatcher的內(nèi)部類
AsyncAppenderRequest#run()
方法中。
class AsyncAppenderRequest implements Runnable {
List<TraceContext> contextList;
public AsyncAppenderRequest(final List<TraceContext> contextList) {
if (contextList != null) {
this.contextList = contextList;
} else {
this.contextList = new ArrayList<TraceContext>(1);
}
}
@Override
public void run() {
sendTraceData(contextList);
}
public void sendTraceData(List<TraceContext> contextList) {
Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
for (TraceContext context : contextList) {
if (context.getTraceBeans().isEmpty()) {
continue;
}
// Topic value corresponding to original message entity content
String topic = context.getTraceBeans().get(0).getTopic();
String regionId = context.getRegionId();
// Use original message entity's topic as key
String key = topic;
if (!StringUtils.isBlank(regionId)) {
key = key + TraceConstants.CONTENT_SPLITOR + regionId;
}
List<TraceTransferBean> transBeanList = transBeanMap.get(key);
if (transBeanList == null) {
transBeanList = new ArrayList<TraceTransferBean>();
transBeanMap.put(key, transBeanList);
}
TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
transBeanList.add(traceData);
}
for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
String dataTopic = entry.getKey();
String regionId = null;
if (key.length > 1) {
dataTopic = key[0];
regionId = key[1];
}
flushData(entry.getValue(), dataTopic, regionId);
}
}
- 按照消息軌跡的存儲協(xié)議對消息軌跡內(nèi)容進行組裝、編碼;
- 按照topic分批調(diào)用flushData()方法將消息發(fā)送到Broker中,完成消息軌跡數(shù)據(jù)的存儲。
2)消息消費
同樣,在消息消費前后RocketMQ會將本次調(diào)用的信息進行采集。RocketMQ通過提供消息消費鉤子函數(shù)(ConsumeMessageHook)實現(xiàn),并且為了不明顯增加消息消費的時間延遲,使用異步的方式記錄消息軌跡。
消息消費和消息發(fā)送是一樣的機制,這里就不冗余介紹了,貼幾個代碼截圖也就一目了然了。
-
實例化消費者
-
啟動消費者
-
以并行消費消息為例:
public void executeHookBefore(final ConsumeMessageContext context) { if (!this.consumeMessageHookList.isEmpty()) { for (ConsumeMessageHook hook : this.consumeMessageHookList) { try { hook.consumeMessageBefore(context); } catch (Throwable e) { } } } } public void executeHookAfter(final ConsumeMessageContext context) { if (!this.consumeMessageHookList.isEmpty()) { for (ConsumeMessageHook hook : this.consumeMessageHookList) { try { hook.consumeMessageAfter(context); } catch (Throwable e) { } } } }
三、總結(jié)
消息軌跡其實就是記錄消息從發(fā)送 到 存儲 再到 消費,整個消息生命周期中的一些關(guān)鍵信息,比如:誰發(fā)送的、發(fā)送耗時多久、消息保存在哪了、誰消費了、消費耗時多久。
在RocketMQ中的實現(xiàn)方式也很簡單,在消息發(fā)送/消費前后基于鉤子函數(shù),做before()、after()邏輯,進而記錄消息軌跡信息。
特別注意:storeTime并不是真實的消息存儲時間,而是一個估算值,取自:客戶端發(fā)送消息耗時的一半。文章來源:http://www.zghlxwxcb.cn/news/detail-795973.html
消息軌跡功能涉及到的關(guān)鍵類:文章來源地址http://www.zghlxwxcb.cn/news/detail-795973.html
- AsyncTraceDispatcher:負責(zé)異步發(fā)送消息軌跡到Broker。
- ConsumeMessageHook:消費消息鉤子函數(shù)
- SendMessageHook:生產(chǎn)消息鉤子函數(shù)
- TraceContext、TraceBean:兩者一起用于表述消息軌跡
到了這里,關(guān)于RocketMQ如何實現(xiàn)消息軌跡:消息何時發(fā)送的?耗時多久?誰消費的?存在哪個broker了?的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!