国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RocketMQ如何安全的批量發(fā)送消息?

這篇具有很好參考價值的文章主要介紹了RocketMQ如何安全的批量發(fā)送消息?。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

優(yōu)點:

批量發(fā)送消息可以提高rocketmq的生產(chǎn)者性能和吞吐量。

使用場景:

  1. 發(fā)送大量小型消息時;
  2. 需要降低消息發(fā)送延遲時;
  3. 需要提高生產(chǎn)者性能時;

注意事項:

  1. 消息列表的大小不能超過broker設(shè)置的最大消息大小;
  2. 消息列表的大小不能超過生產(chǎn)證設(shè)置的maxMessageSize 參數(shù),此參數(shù)默認為 4MB;
  3. 批量發(fā)送消息不支持消息事務;
  4. 如果代碼在發(fā)送消息列表時發(fā)生異常,則可能會發(fā)生部分消息發(fā)送成功,部分消息發(fā)送失敗的情況。如果要確保所有消息都已成功發(fā)送,則需要增加錯誤處理邏輯和消息重試機制;

批量發(fā)送消息為什么要限制maxMessageSize?

消息列表的大小不能超過生產(chǎn)者設(shè)置的maxMessageSize參數(shù),主要是為了避免消息發(fā)送延遲和消息過大導致broker出現(xiàn)性能問題。如果嘗試發(fā)送大于maxMessageSize的消息,RocketMQ會拋出MessageTooLargeException異常,并且消息不會被發(fā)送到broker。

如果開發(fā)者在開發(fā)時遇到了消息列表大小超過maxMessageSize的情況,可以考慮以下幾種處理方式:文章來源地址http://www.zghlxwxcb.cn/news/detail-773690.html

    1. 提升maxMessageSize參數(shù)的大小,這樣可以容納更大的消息列表。但是,需要注意在提升參數(shù)大小時,要考慮到RocketMQ broker的性能和網(wǎng)絡帶寬等因素。
    2. 考慮將消息列表進行拆分,然后分批發(fā)送。這樣可以避免一次發(fā)送過多的消息。
    3. 計算消息的大小并進行壓縮??梢允褂靡恍嚎s算法,如 LZ4、Snappy 等,對消息進行壓縮,以減小消息的大小。
    4. 對超過 maxMessageSize 的消息進行過濾或其他處理。可以通過業(yè)務邏輯對消息進行分組或分類,對超過 maxMessageSize 的消息進行過濾或其他處理,以避免發(fā)送超出限制的消息。

代碼實現(xiàn)

package com.resource.sync.rocketmq;


import java.util.Iterator;
import java.util.List;

/**
 * @description:消息分割,在rocketmq中,一次性發(fā)送消息的長度不可超過4mb,此時我們需要進行切割,確保消息長度小于4mb
 **/
public class ListSplitter<T> implements Iterator<List<T>> {
    /**
     * 分割數(shù)據(jù)大小
     */
    private int sizeLimit;

    /**
     * 分割數(shù)據(jù)列表
     */
    private final List<T> messages;

    /**
     * 分割索引
     */
    private int currIndex;

    public ListSplitter(int sizeLimit, List<T> messages) {
        this.sizeLimit = sizeLimit;
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<T> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            T t = messages.get(nextIndex);
            totalSize = totalSize + t.toString().length();
            if (totalSize > sizeLimit) {
                break;
            }
        }
        List<T> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
    private final int maxMessageSize = 1024 * 1024 * 4;

	/**
     * 消息分割(批量發(fā)送)
     */
    private void bulkSendMsg(List<Message<String>> messageList) {
        // 限制數(shù)據(jù)大小
        ListSplitter splitter = new ListSplitter(maxMessageSize, messageList);
        while (splitter.hasNext()) {
            List<Message> nextList = splitter.next();
            syncBulkSendMessage("topic", nextList);
        }
    }


    /**
     * @param topic
     * @param list
     * @description:發(fā)送實時消息(批量)
     */
    public void syncBulkSendMessage(String topic, List<Message> list) {
        SendResult sendResult = null;
        try {
            sendResult = rocketMQTemplate.syncSend(topic, list);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR.RESULT_STATUS:{},MSG_ID:{}", sendResult.getSendStatus(), sendResult.getMsgId());
            }
            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                log.info("BULK_SEND_MSG_SUCCESS.MSG_ID:{}", sendResult.getMsgId());
            }

        } catch (Exception e) {
            log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR:{}", e);
        }
    }

到了這里,關(guān)于RocketMQ如何安全的批量發(fā)送消息?的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務器費用

相關(guān)文章

  • RocketMQ發(fā)送消息超時異常

    RocketMQ發(fā)送消息超時異常

    說明:在使用RocketMQ發(fā)送消息時,出現(xiàn)下面這個異常(org.springframework.messging.MessgingException:sendDefaultImpl call timeout……); 解決:修改RocketMQ中broke.conf配置,添加下面這兩行配置,重啟服務后再試就可以了; 啟動時,注意使用下面的命令,帶上配置文件

    2024年02月13日
    瀏覽(22)
  • RocketMQ發(fā)送消息

    RocketMQ發(fā)送消息

    目錄 一.消費模式?編輯 二.發(fā)送消息 1.普通消息 同步消息(***)? 異步消息(***) 單向消息(*) 日志服務的編寫思路 2.延遲消息(***) 延遲等級? 3.批量消息 4.順序消息(*) 三.Tag過濾 訂閱關(guān)系的一致性 ①訂閱一個Topic且訂閱一個Tag ②訂閱一個Topic且訂閱多個Tag ③訂閱多個Topic且訂閱多

    2024年02月11日
    瀏覽(23)
  • rocketMQ-console 發(fā)送消息

    rocketMQ-console 發(fā)送消息

    rocketMQ-console是一款非常使用的rocketMQ擴展工具 工具代碼倉 mirrors / apache / rocketmq-externals · GitCode 安裝詳細教程 ??????rocketMQ學習筆記二:RocketMQ-Console安裝、使用詳解_麥田里的碼農(nóng)-CSDN博客_rocketmq-consoled 直接來到工具頁面 ,右上角可以切換語言 發(fā)送消息流程 1.點擊 最

    2024年02月14日
    瀏覽(20)
  • 13.RocketMQ之消息的存儲與發(fā)送

    13.RocketMQ之消息的存儲與發(fā)送

    分布式隊列因為有高可靠性的要求,所以數(shù)據(jù)要進行持久化存儲。 消息生成者發(fā)送消息 Broker收到消息,將消息進行持久化,在存儲中新增一條記錄 返回ACK給生產(chǎn)者 Broker消息給對應的消費者,然后等待消費者返回ACK 如果消息消費者在指定時間內(nèi)成功返回ack,那么MQ認為消息消

    2024年02月11日
    瀏覽(22)
  • [RocketMQ] Producer發(fā)送消息的總體流程 (七)

    [RocketMQ] Producer發(fā)送消息的總體流程 (七)

    單向發(fā)送: 把消息發(fā)向Broker服務器, 不管Broker是否接收, 只管發(fā), 不管結(jié)果。 同步發(fā)送: 把消息發(fā)向Broker服務器, 如果Broker成功接收, 可以得到Broker的響應。 異步發(fā)送: 把消息發(fā)向Broker服務器, 如果Broker成功接收, 可以得到Broker的響應。異步所以發(fā)送消息后, 不用等待, 等到Broker服

    2024年02月11日
    瀏覽(21)
  • SpringBoot集成RocketMQ實現(xiàn)三種消息發(fā)送方式

    SpringBoot集成RocketMQ實現(xiàn)三種消息發(fā)送方式

    目錄 一、pom文件引入依賴 二、application.yml文件添加內(nèi)容 三、創(chuàng)建producer生產(chǎn)者 四、創(chuàng)建Consumer消費者(創(chuàng)建兩個消費者,所屬一個Topic) 五、啟動項目測試 RocketMQ 支持3 種消息發(fā)送方式: 同步 (sync)、異步(async)、單向(oneway)。 同步 :發(fā)送者向 MQ 執(zhí)行發(fā)送消息API 時

    2024年02月13日
    瀏覽(22)
  • Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 來配置發(fā)送和消費 RocketMQ 消息

    ? ? ? ?本文解析將 RocketMQ Client 端集成為 spring-boot-starter 框架的開發(fā)細節(jié),然后通過一個簡單的示例來一步一步的講解如何使用這個 spring-boot-starter 工具包來配置,發(fā)送和消費 RocketMQ 消息。 添加maven依賴: 修改application.properties 注意: 請將上述示例配置中的 127.0.0.1:9876 替換

    2024年03月22日
    瀏覽(29)
  • RocketMQ教程-(5)-功能特性-消息發(fā)送重試和流控機制

    本文為您介紹 Apache RocketMQ 的消息發(fā)送重試機制和消息流控機制。 消息發(fā)送重試 Apache RocketM Q的消息發(fā)送重試機制主要為您解答如下問題: 部分節(jié)點異常是否影響消息發(fā)送? 請求重試是否會阻塞業(yè)務調(diào)用? 請求重試會帶來什么不足? 消息流控 Apache RocketMQ 的流控機制主要為

    2024年02月15日
    瀏覽(27)
  • 如何使用 Java 發(fā)送消息到 RabbitMQ 中的隊列

    RabbitMQ是一個強大的消息隊列中間件,可以實現(xiàn)高效的消息傳遞和解耦。在實際應用中,我們還可以使用更多高級特性,如消息持久化、消息確認機制、消息路由策略等,以滿足復雜的業(yè)務需求。本文將介紹如何在Spring Boot應用程序中集成RabbitMQ,并實現(xiàn)一個簡單的消息發(fā)送和

    2024年03月14日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包