国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RocketMQ如何實現(xiàn)消息軌跡:消息何時發(fā)送的?耗時多久?誰消費的?存在哪個broker了?

這篇具有很好參考價值的文章主要介紹了RocketMQ如何實現(xiàn)消息軌跡:消息何時發(fā)送的?耗時多久?誰消費的?存在哪個broker了?。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、前言

更多RocketMQ內(nèi)容,見專欄:https://blog.csdn.net/saintmm/category_11280399.html

二、消息軌跡

消息軌跡簡單來說就是日志,其把消息的生產(chǎn)、存儲、消費等所有的訪問和操作日志。

1、消息軌跡的引入目的

在項目中存在發(fā)送方與消費方相互“扯皮”的情況:

  • 發(fā)送方說消息已經(jīng)發(fā)送成功,而消費方說沒有消費到。
  • 這時我們就希望能記錄一條消息的流轉(zhuǎn)軌跡,即:消息是由哪個IP發(fā)送的?什么時候發(fā)送的?是被哪個消費者消費的?

2、如何使用消息軌跡

1> 修改Broker服務(wù)端配置,設(shè)置 traceTopicEnable=true

  • 表示在Broker上創(chuàng)建名為RMQ_SYS_TRACE_TOPIC的topic,隊

    列個數(shù)為1。所有的msgTrace信息默認都存儲在這個topic中。

2> Producer中開啟消息軌跡;

  •   public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
    
    • boolean類型的入?yún)?code>enableMsgTrace設(shè)置為true,表示啟用消息軌跡追蹤,默認為false。
  •   public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
    
    • String類型的入?yún)?code>customizedTraceTopic,表示用于記錄消息軌跡的topic,不設(shè)置默認為RMQ_SYS_TRACE_TOPIC

3> Consuemr中開啟消息軌跡;

  •   public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace)
    
    • boolean類型的入?yún)?code>enableMsgTrace設(shè)置為true,表示啟用消息軌跡追蹤,默認為false。

如果啟用了消息軌跡,在消息發(fā)送時盡量為消息指定Key屬性,以便在RocketMQ-Console中對消息進行高性能的查詢。

1)使用案例

1> broker的配置文件(broker.conf)中增加如下配置,然后重啟Broker:

traceTopicEnable = true

2> Producer:

public class TraceProducer {
    public static void main(String[] args) throws Exception {
        // 第二個參數(shù)TRUE,表示開啟MsgTrace
        DefaultMQProducer producer = new DefaultMQProducer("saint-test", true);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setMaxMessageSize(1024 * 1024 * 10);
        producer.start();

        Message msg = new Message("test-topic-trace",null, "key-trace", "trace-2".getBytes(StandardCharsets.UTF_8));
        SendResult send = producer.send(msg);

        System.out.println("sendResult: " + send);

        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
        System.out.println("已經(jīng)停機");
    }
}

3> Consumer:

public class TraceConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("study-consumer", true);
        consumer.setNamesrvAddr("127.0.0.1:9876");

        consumer.subscribe("test-topic-trace", "*");
        consumer.setConsumeTimeout(20L);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
        System.out.println("Consumer start。。。。。。");

    }
}

2)消息軌跡內(nèi)容

消息軌跡內(nèi)容包括:消息ID(MessageID)、消息Tag、消息Key(MessageKey)、消息的存儲時間、處理消息的客戶端IP、存儲服務(wù)器IP、發(fā)送/消費耗時、消息軌跡狀態(tài)、跟蹤類型。

在RocketMQ-Console中的消息軌跡內(nèi)容如下:
rocketmq 顯示message花費的時間,精通消息隊列MQ,rocketmq,spring cloud

3) RocketMQ-Console中查看消息軌跡

在MessageTrace大分類下有兩種方式可以查看消息軌跡,一種是根據(jù) 原消息Topic + MessageKey、另一種是根據(jù) 原消息Topic + MessageID;
rocketmq 顯示message花費的時間,精通消息隊列MQ,rocketmq,spring cloud

所以建議如果啟用了消息軌跡,在消息發(fā)送時盡量為消息指定Key屬性,以便在RocketMQ-Console中對消息進行高性能的查詢。

1> 根據(jù)Message Key查詢:
rocketmq 顯示message花費的時間,精通消息隊列MQ,rocketmq,spring cloud

2> 根據(jù)Message ID查詢:
rocketmq 顯示message花費的時間,精通消息隊列MQ,rocketmq,spring cloud

3、消息軌跡實現(xiàn)原理

1)消息軌跡數(shù)據(jù)結(jié)構(gòu)

1> 消息軌跡主體內(nèi)容采用TraceContext類存儲;

public class TraceContext implements Comparable<TraceContext> {

    private TraceType traceType;
    private long timeStamp = System.currentTimeMillis();
    private String regionId = "";
    private String regionName = "";
    private String groupName = "";
    private int costTime = 0;
    private boolean isSuccess = true;
    private String requestId = MessageClientIDSetter.createUniqID();
    private int contextCode = 0;
    private List<TraceBean> traceBeans;
  1. traceType:跟蹤類型,可選值為Pub(消息發(fā)送)、SubBefore(消息拉取到客戶端,在執(zhí)行業(yè)務(wù)定義的消費邏輯之前)、SubAfter(消費后)。

  2. timeStamp:當(dāng)前時間戳。

  3. regionId:Broker所在的區(qū)域ID,取自BrokerConfig#regionId()。

  4. groupName:組名稱,traceType為Pub時表示生產(chǎn)者組的名稱,traceType為subBefore或subAfter時表示消費組名稱。

  5. requestId:在traceType為subBefore、subAfter時使用,消

    費端的請求ID。

  6. contextCode:消費狀態(tài)碼,可選值為SUCCESS、TIME_OUT、EXCEPTION、RETURNNULL、FAILED。

2> 針對消息信息采用TraceBean類維護;

public class TraceBean {
    private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
    private String topic = "";
    private String msgId = "";
    private String offsetMsgId = "";
    private String tags = "";
    private String keys = "";
    private String storeHost = LOCAL_ADDRESS;
    private String clientHost = LOCAL_ADDRESS;
    private long storeTime;
    private int retryTimes;
    private int bodyLength;
    private MessageType msgType;
}
  1. topic:消息主題
  2. msgId:消息唯一ID
  3. offsetMsgId:消息偏移量ID,該ID中包含了Broker的IP以及偏移量
  4. tags:消息標(biāo)志
  5. keys:消息索引key,根據(jù)該key可快速檢索消息
  6. storeHost:跟蹤類型為Pub時存儲該消息的Broker服務(wù)器IP,跟蹤類型為subBefore、subAfter時存儲消費者IP
  7. bodyLength:消息體的長度
  8. msgType:消息的類型,可選值為Normal_Msg(普通消息)、Trans_Msg_Half(預(yù)提交消息)、Trans_msg_Commit(提交消息)、Delay_Msg(延遲消息)

2)軌跡消息存儲

RocketMQ選擇將消息軌跡數(shù)據(jù)當(dāng)作一條消息,存儲在Broker服務(wù)器中。

RocketMQ提供了兩種方式來定義消息軌跡存儲的topic:

  1. 系統(tǒng)默認topic:如果Broker的traceTopicEnable配置項設(shè)為true,表示在該Broker上創(chuàng)建名為RMQ_SYS_TRACE_TOPIC的topic,隊列個數(shù)為1,默認該值為false。
  2. 自定義topic:在創(chuàng)建消息生產(chǎn)者或消息消費者時,可以通過參數(shù)自定義用于記錄消息軌跡的topic名稱;
    • 注意:RokcetMQ控制臺(rocketmq-console)中只支持配置一個消息軌跡topic,建議使用系統(tǒng)默認的topic。

另外:為了避免消息軌跡的數(shù)據(jù)與正常的業(yè)務(wù)數(shù)據(jù)混在一起,官方建議單獨使用一個Broker用于開啟消息軌跡跟蹤。消息軌跡數(shù)據(jù)只會發(fā)送到這一臺Broker服務(wù)器上,不影響原業(yè)務(wù)Broker的負載壓力。

4、如何采集消息軌跡數(shù)據(jù)

MQ的核心操作是消息發(fā)送和消息存儲,數(shù)據(jù)載體為消息。消息軌跡主要是記錄消息何時發(fā)送到哪臺Broker?發(fā)送耗時是多少?在什么時候被哪個消費者消費?消費耗時是多少?…

1)消息發(fā)送

在消息發(fā)送前后RocketMQ會將本次調(diào)用的信息進行采集。RocketMQ通過提供消息發(fā)送鉤子函數(shù)(SendMessageHook)實現(xiàn),并且為了不明顯增加消息發(fā)送的時間延遲,使用異步的方式記錄消息軌跡。

1> 實例化Producer

在案例中我們知道在實例化DefaultMQProducer時,需要將入?yún)nableMsgTrace設(shè)置為true,才能開啟消息軌跡。當(dāng)enableMsgTrace為true時,看DefaultMQProducer的構(gòu)造函數(shù):

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
                         boolean enableMsgTrace, final String customizedTraceTopic) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    //if client open the message trace feature
    if (enableMsgTrace) {
        try {
            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
            dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
            traceDispatcher = dispatcher;
            this.getDefaultMQProducerImpl().registerSendMessageHook(
                new SendMessageTraceHookImpl(traceDispatcher));
        } catch (Throwable e) {
            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
        }
    }
}

其中:

  • SendMessageTraceHookImpl:消息發(fā)送的鉤子函數(shù),用于跟蹤消息軌跡;
    • 對應(yīng)的消息消費鉤子函數(shù)為:ConsumeMessageTraceHookImpl
  • AsyncTraceDispatcher:消息軌跡異步轉(zhuǎn)發(fā)器。用于在消息生產(chǎn)時,異步將消息軌跡保存到特定topic(默認為RMQ_SYS_TRACE_TOPIC
2> Producer發(fā)送消息

在DefaultMqProducerImple#sendKernelImpl方法中,會首先判斷是否有發(fā)送消息鉤子函數(shù)(SendMessageHook);如果有:

  • 在發(fā)送消息之前調(diào)用鉤子函數(shù)SendMessageHook#sendMessageBefore()方法,將消息軌跡數(shù)據(jù)先存儲在調(diào)用上下文中。
  • 在發(fā)送消息之后調(diào)用鉤子函數(shù)SendMessageHook#sendMessageAfter()方法,使用AsyncTraceDispatcher異步將消息軌跡數(shù)據(jù)發(fā)送到消息服務(wù)器(Broker)上。

rocketmq 顯示message花費的時間,精通消息隊列MQ,rocketmq,spring cloud

無論是DefaultMQProducerImpl中的executeSendMessageHookBefore()方法還是executeSendMessageHookAfter()方法,內(nèi)部都是調(diào)用所有SendMessageHook的相應(yīng)before()、after()方法。

public void executeSendMessageHookBefore(final SendMessageContext context) {
    if (!this.sendMessageHookList.isEmpty()) {
        for (SendMessageHook hook : this.sendMessageHookList) {
            try {
                hook.sendMessageBefore(context);
            } catch (Throwable e) {
                log.warn("failed to executeSendMessageHookBefore", e);
            }
        }
    }
}

public void executeSendMessageHookAfter(final SendMessageContext context) {
    if (!this.sendMessageHookList.isEmpty()) {
        for (SendMessageHook hook : this.sendMessageHookList) {
            try {
                hook.sendMessageAfter(context);
            } catch (Throwable e) {
                log.warn("failed to executeSendMessageHookAfter", e);
            }
        }
    }
}
<1> sendMessageBefore()
@Override
public void sendMessageBefore(SendMessageContext context) {
    //if it is message trace data,then it doesn't recorded
    if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
        return;
    }
    //build the context content of TuxeTraceContext
    TraceContext tuxeContext = new TraceContext();
    tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
    context.setMqTraceContext(tuxeContext);
    tuxeContext.setTraceType(TraceType.Pub);
    tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
    //build the data bean object of message trace
    TraceBean traceBean = new TraceBean();
    traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
    traceBean.setTags(context.getMessage().getTags());
    traceBean.setKeys(context.getMessage().getKeys());
    traceBean.setStoreHost(context.getBrokerAddr());
    traceBean.setBodyLength(context.getMessage().getBody().length);
    traceBean.setMsgType(context.getMsgType());
    traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
    tuxeContext.getTraceBeans().add(traceBean);
}

方法邏輯很簡單:只是在消息發(fā)送之前先收集消息的topic、tag、key、存儲Broker的IP地址、消息體的長度等基礎(chǔ)信息,并將消息軌跡數(shù)據(jù)先存儲在調(diào)用上下文中。

<2> sendMessageAfter()
@Override
public void sendMessageAfter(SendMessageContext context) {
    //if it is message trace data,then it doesn't recorded
    if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
        || context.getMqTraceContext() == null) {
        return;
    }
    if (context.getSendResult() == null) {
        return;
    }

    if (context.getSendResult().getRegionId() == null
        || !context.getSendResult().isTraceOn()) {
        // if switch is false,skip it
        return;
    }

    TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
    TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
    int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
    tuxeContext.setCostTime(costTime);
    if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
        tuxeContext.setSuccess(true);
    } else {
        tuxeContext.setSuccess(false);
    }
    tuxeContext.setRegionId(context.getSendResult().getRegionId());
    traceBean.setMsgId(context.getSendResult().getMsgId());
    traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
    traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
    localDispatcher.append(tuxeContext);
}

消息發(fā)送到Broker返回之后會調(diào)用到sendMessageAfter()方法。

  1. 進行一些校驗:

    • 如果消息發(fā)送上下為空 或 發(fā)送消息結(jié)果為空,則直接返回,不記錄消息軌跡數(shù)據(jù)。
  2. 進一步維護消息軌跡數(shù)據(jù);

    • 從MqTraceContext中獲取跟蹤的TraceBean,雖然TraceContext中將TraceBean設(shè)計成List結(jié)構(gòu),但在消息發(fā)送時,這里的數(shù)據(jù)永遠只有一條,即使是批量發(fā)送。

    • 設(shè)置costTime(消息發(fā)送耗時)、success(是否發(fā)送成功)、regionId(發(fā)送到Broker所在的分區(qū))、msgId(消息ID,全局唯一)、offsetMsgId(消息物理偏移量,如果是批量消息,則是最后一條消息的物理偏移量)、storeTime。

    • 注意:storeTime并不是真實的消息存儲時間,而是一個估算值,取自:客戶端發(fā)送消息耗時的一半。

      traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
      
  3. 關(guān)鍵點:消息軌跡的異步發(fā)送;

    • 將消息軌跡添加到AsyncTraceDispatcher中的阻塞隊列traceContextQueue中,以供后續(xù)異步發(fā)送消息軌跡使用;

      rocketmq 顯示message花費的時間,精通消息隊列MQ,rocketmq,spring cloud

    • 在實例化Producer時,如果開啟了消息軌跡,會實例化AsyncTraceDispatcher;并且在啟動Producer時也會啟動AsyncTraceDispatcher

      • 最終目的是從阻塞隊列traceContextQueue中找到待異步發(fā)送的軌跡消息,然后發(fā)送到相應(yīng)Broker中。

      rocketmq 顯示message花費的時間,精通消息隊列MQ,rocketmq,spring cloud

<3> 消息軌跡異步發(fā)送

進一步看AsyncTraceDispatcher#start()方法:

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
    if (isStarted.compareAndSet(false, true)) {
        traceProducer.setNamesrvAddr(nameSrvAddr);
        traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
        traceProducer.start();
    }
    this.accessChannel = accessChannel;
    this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
    this.worker.setDaemon(true);
    this.worker.start();
    this.registerShutDownHook();
}

其中啟動一個后臺線程,線程的具體邏輯體現(xiàn)在AsyncRunnable中。

class AsyncRunnable implements Runnable {
        private boolean stopped;

        @Override
        public void run() {
            while (!stopped) {
                List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
                for (int i = 0; i < batchSize; i++) {
                    TraceContext context = null;
                    try {
                        //get trace data element from blocking Queue — traceContextQueue
                        context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                    }
                    if (context != null) {
                        contexts.add(context);
                    } else {
                        break;
                    }
                }
                if (contexts.size() > 0) {
                    AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
                    traceExecutor.submit(request);
                } else if (AsyncTraceDispatcher.this.stopped) {
                    this.stopped = true;
                }
            }

        }
    }
  • 從待發(fā)送隊列traceContextQueue中不斷獲取消息軌跡的數(shù)據(jù),并將其異步發(fā)送到消息服務(wù)器。
  • 為了提高消息的發(fā)送效率引入批量機制,即:一次從隊列中獲取一批消息
    • 然后封裝成AsyncAppenderRequest任務(wù)并提交到線程池中異步執(zhí)行;
    • 真正的發(fā)送消息軌跡數(shù)據(jù)的邏輯被封裝在AsyncTraceDispatcher的內(nèi)部類AsyncAppenderRequest#run()方法中。
class AsyncAppenderRequest implements Runnable {
    List<TraceContext> contextList;

    public AsyncAppenderRequest(final List<TraceContext> contextList) {
        if (contextList != null) {
            this.contextList = contextList;
        } else {
            this.contextList = new ArrayList<TraceContext>(1);
        }
    }

    @Override
    public void run() {
        sendTraceData(contextList);
    }

    public void sendTraceData(List<TraceContext> contextList) {
        Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
        for (TraceContext context : contextList) {
            if (context.getTraceBeans().isEmpty()) {
                continue;
            }
            // Topic value corresponding to original message entity content
            String topic = context.getTraceBeans().get(0).getTopic();
            String regionId = context.getRegionId();
            // Use  original message entity's topic as key
            String key = topic;
            if (!StringUtils.isBlank(regionId)) {
                key = key + TraceConstants.CONTENT_SPLITOR + regionId;
            }
            List<TraceTransferBean> transBeanList = transBeanMap.get(key);
            if (transBeanList == null) {
                transBeanList = new ArrayList<TraceTransferBean>();
                transBeanMap.put(key, transBeanList);
            }
            TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
            transBeanList.add(traceData);
        }
        for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
            String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
            String dataTopic = entry.getKey();
            String regionId = null;
            if (key.length > 1) {
                dataTopic = key[0];
                regionId = key[1];
            }
            flushData(entry.getValue(), dataTopic, regionId);
        }
    }
  • 按照消息軌跡的存儲協(xié)議對消息軌跡內(nèi)容進行組裝、編碼;
  • 按照topic分批調(diào)用flushData()方法將消息發(fā)送到Broker中,完成消息軌跡數(shù)據(jù)的存儲。

2)消息消費

同樣,在消息消費前后RocketMQ會將本次調(diào)用的信息進行采集。RocketMQ通過提供消息消費鉤子函數(shù)(ConsumeMessageHook)實現(xiàn),并且為了不明顯增加消息消費的時間延遲,使用異步的方式記錄消息軌跡。

消息消費和消息發(fā)送是一樣的機制,這里就不冗余介紹了,貼幾個代碼截圖也就一目了然了。

  • 實例化消費者

    rocketmq 顯示message花費的時間,精通消息隊列MQ,rocketmq,spring cloud

  • 啟動消費者

    rocketmq 顯示message花費的時間,精通消息隊列MQ,rocketmq,spring cloud

  • 以并行消費消息為例:

    rocketmq 顯示message花費的時間,精通消息隊列MQ,rocketmq,spring cloud

    public void executeHookBefore(final ConsumeMessageContext context) {
            if (!this.consumeMessageHookList.isEmpty()) {
                for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                    try {
                        hook.consumeMessageBefore(context);
                    } catch (Throwable e) {
                    }
                }
            }
        }
    
        public void executeHookAfter(final ConsumeMessageContext context) {
            if (!this.consumeMessageHookList.isEmpty()) {
                for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                    try {
                        hook.consumeMessageAfter(context);
                    } catch (Throwable e) {
                    }
                }
            }
        }
    

三、總結(jié)

消息軌跡其實就是記錄消息從發(fā)送 到 存儲 再到 消費,整個消息生命周期中的一些關(guān)鍵信息,比如:誰發(fā)送的、發(fā)送耗時多久、消息保存在哪了、誰消費了、消費耗時多久。

在RocketMQ中的實現(xiàn)方式也很簡單,在消息發(fā)送/消費前后基于鉤子函數(shù),做before()、after()邏輯,進而記錄消息軌跡信息。

特別注意:storeTime并不是真實的消息存儲時間,而是一個估算值,取自:客戶端發(fā)送消息耗時的一半。

消息軌跡功能涉及到的關(guān)鍵類:文章來源地址http://www.zghlxwxcb.cn/news/detail-795973.html

  • AsyncTraceDispatcher:負責(zé)異步發(fā)送消息軌跡到Broker。
  • ConsumeMessageHook:消費消息鉤子函數(shù)
  • SendMessageHook:生產(chǎn)消息鉤子函數(shù)
  • TraceContext、TraceBean:兩者一起用于表述消息軌跡

到了這里,關(guān)于RocketMQ如何實現(xiàn)消息軌跡:消息何時發(fā)送的?耗時多久?誰消費的?存在哪個broker了?的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • RocketMQ 發(fā)送批量消息、過濾消息和事務(wù)消息

    RocketMQ 發(fā)送批量消息、過濾消息和事務(wù)消息

    前面我們知道RocketMQ 發(fā)送延時消息與順序消息,現(xiàn)在我們看下怎么發(fā)送批量消息、過濾消息和事務(wù)消息。 限制是這些批量消息應(yīng)該有相同的 topic,相同的 waitStoreMsgOK,而且不能是延時消息。 此外,這一批消息的總大小不應(yīng)超過4MB。 消息的生產(chǎn)者 消息的消費者 消息分割 如果

    2023年04月21日
    瀏覽(20)
  • RocketMQ發(fā)送消息失敗排查

    RocketMQ發(fā)送消息失敗排查

    錯誤信息: 錯誤截圖: 查看結(jié)果: 說明:發(fā)現(xiàn)對應(yīng)的訂閱組已經(jīng)離線(查看對應(yīng)的項目MQ地址和配置都是正確的),然后從服務(wù)日志中也看不出更多的問題 說明:調(diào)整服務(wù)日志級別到info,通過詳細的日志信息定位發(fā)送失敗的原因 日志截圖: 說明:日志不斷打印 closeChanne

    2024年02月04日
    瀏覽(25)
  • RocketMQ發(fā)送消息超時異常

    RocketMQ發(fā)送消息超時異常

    說明:在使用RocketMQ發(fā)送消息時,出現(xiàn)下面這個異常(org.springframework.messging.MessgingException:sendDefaultImpl call timeout……); 解決:修改RocketMQ中broke.conf配置,添加下面這兩行配置,重啟服務(wù)后再試就可以了; 啟動時,注意使用下面的命令,帶上配置文件

    2024年02月13日
    瀏覽(22)
  • RocketMQ發(fā)送消息

    RocketMQ發(fā)送消息

    目錄 一.消費模式?編輯 二.發(fā)送消息 1.普通消息 同步消息(***)? 異步消息(***) 單向消息(*) 日志服務(wù)的編寫思路 2.延遲消息(***) 延遲等級? 3.批量消息 4.順序消息(*) 三.Tag過濾 訂閱關(guān)系的一致性 ①訂閱一個Topic且訂閱一個Tag ②訂閱一個Topic且訂閱多個Tag ③訂閱多個Topic且訂閱多

    2024年02月11日
    瀏覽(23)
  • rocketMQ-console 發(fā)送消息

    rocketMQ-console 發(fā)送消息

    rocketMQ-console是一款非常使用的rocketMQ擴展工具 工具代碼倉 mirrors / apache / rocketmq-externals · GitCode 安裝詳細教程 ??????rocketMQ學(xué)習(xí)筆記二:RocketMQ-Console安裝、使用詳解_麥田里的碼農(nóng)-CSDN博客_rocketmq-consoled 直接來到工具頁面 ,右上角可以切換語言 發(fā)送消息流程 1.點擊 最

    2024年02月14日
    瀏覽(19)
  • 13.RocketMQ之消息的存儲與發(fā)送

    13.RocketMQ之消息的存儲與發(fā)送

    分布式隊列因為有高可靠性的要求,所以數(shù)據(jù)要進行持久化存儲。 消息生成者發(fā)送消息 Broker收到消息,將消息進行持久化,在存儲中新增一條記錄 返回ACK給生產(chǎn)者 Broker消息給對應(yīng)的消費者,然后等待消費者返回ACK 如果消息消費者在指定時間內(nèi)成功返回ack,那么MQ認為消息消

    2024年02月11日
    瀏覽(22)
  • [RocketMQ] Producer發(fā)送消息的總體流程 (七)

    [RocketMQ] Producer發(fā)送消息的總體流程 (七)

    單向發(fā)送: 把消息發(fā)向Broker服務(wù)器, 不管Broker是否接收, 只管發(fā), 不管結(jié)果。 同步發(fā)送: 把消息發(fā)向Broker服務(wù)器, 如果Broker成功接收, 可以得到Broker的響應(yīng)。 異步發(fā)送: 把消息發(fā)向Broker服務(wù)器, 如果Broker成功接收, 可以得到Broker的響應(yīng)。異步所以發(fā)送消息后, 不用等待, 等到Broker服

    2024年02月11日
    瀏覽(21)
  • Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 來配置發(fā)送和消費 RocketMQ 消息

    ? ? ? ?本文解析將 RocketMQ Client 端集成為 spring-boot-starter 框架的開發(fā)細節(jié),然后通過一個簡單的示例來一步一步的講解如何使用這個 spring-boot-starter 工具包來配置,發(fā)送和消費 RocketMQ 消息。 添加maven依賴: 修改application.properties 注意: 請將上述示例配置中的 127.0.0.1:9876 替換

    2024年03月22日
    瀏覽(29)
  • RocketMQ教程-(5)-功能特性-消息發(fā)送重試和流控機制

    本文為您介紹 Apache RocketMQ 的消息發(fā)送重試機制和消息流控機制。 消息發(fā)送重試 Apache RocketM Q的消息發(fā)送重試機制主要為您解答如下問題: 部分節(jié)點異常是否影響消息發(fā)送? 請求重試是否會阻塞業(yè)務(wù)調(diào)用? 請求重試會帶來什么不足? 消息流控 Apache RocketMQ 的流控機制主要為

    2024年02月15日
    瀏覽(27)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包