国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

消息中間件-RocketMQ

這篇具有很好參考價(jià)值的文章主要介紹了消息中間件-RocketMQ。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

????????RocketMQ是阿里巴巴開源的消息分布中間件,在阿里內(nèi)部使用非常更廣泛,已經(jīng)經(jīng)過了“雙11”這種萬億級(jí)的應(yīng)用場(chǎng)景考驗(yàn)。

1.安裝

????????下載地址:http://rocketmq.apache.org/release_notes/release-notes-4.4.0/

????????下載完成后解壓縮安裝包到指定目錄。

2.配置

? ?????????(1)環(huán)境變量:ROCKETMQ_HOME

??????????????消息中間件-RocketMQ,rocketmq,java,java-rocketmq?

? ? ? ? ? ?(2)Path

??????????????消息中間件-RocketMQ,rocketmq,java,java-rocketmq?

3.啟動(dòng)

? ? ? ? ? 切換到當(dāng)前下載RocketMQ的bin目錄下

????????(1)啟動(dòng)NameServer ---> start mqnamesrv.cmd

消息中間件-RocketMQ,rocketmq,java,java-rocketmq?

? ? ? ? ? ? ??消息中間件-RocketMQ,rocketmq,java,java-rocketmq?

????????(2)啟動(dòng)Broker --->?? start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

消息中間件-RocketMQ,rocketmq,java,java-rocketmq?

4.RocketMQ的架構(gòu)及概念

? ? ? ? 看下面這張圖

????????消息中間件-RocketMQ,rocketmq,java,java-rocketmq?

?文章來源地址http://www.zghlxwxcb.cn/news/detail-708056.html

? ? ? ? RocketMQ整體分為4個(gè)角色,NameServer、Broker、Producer、Consumer、

? ? ? ? Broker:為RocketMQ的核心,負(fù)責(zé)消息的接受,存儲(chǔ),投遞等功能。

? ? ? ? NameServer:消息隊(duì)列的協(xié)商者,Broker向它注冊(cè)路由信息,同時(shí)Producer和Consumer向其獲取路由信息。

? ? ? ? Producer:消息的生產(chǎn)者,需要從NameServer獲取Borker信息,然后與Broker建立連接,向Broker發(fā)送消息。

? ? ? ? Consumer:消息的消費(fèi)者,需要從NameServer獲取Broker信息,然后與Broker建立連接,從Broker獲取消息。

? ? ? ? 另外還包括別的組件,

? ? ? ? Topic:用來區(qū)分不同的消息類型,發(fā)送和接收消息前都要先創(chuàng)建Topic,針對(duì)Topic來發(fā)送和接收消息。

? ? ? ? Message Queue:消息隊(duì)列,一個(gè)Topic可以設(shè)置一個(gè)或多個(gè)MessageQueue,這樣消息就可以并行往各個(gè)Message Queue發(fā)送消息,消費(fèi)者也可以并行的從多個(gè)Message Queue讀取消息,提高性能和吞吐量。

? ? ? ? Message:消息的載體。

5.消息發(fā)送和接收(應(yīng)用)

? ? ? ?搭建好SpringBoot項(xiàng)目后,先導(dǎo)入RocketMQ所需的依賴:

 <!--MQ-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

5.1 同步發(fā)送? ? ? ??

? ? ? ? 同步發(fā)送方式比較可靠,應(yīng)用也比較廣泛,比如:重要的消息通知,短信通知。

消息發(fā)送方:

//發(fā)送同步消息
public class RocketMQSendTest1 {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        //1.創(chuàng)建消息生產(chǎn)者,指定生產(chǎn)者所屬的組名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //3.啟動(dòng)生產(chǎn)者
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message message = new Message("myTopic", "myTag", ("十行代碼九個(gè)錯(cuò)誤八個(gè)警告竟敢說七日精通六天學(xué)會(huì)五湖四海也不見如此三心二意之程序簡(jiǎn)直一等下流" + i).getBytes());
            //發(fā)送消息
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult);
        }

        //關(guān)閉生產(chǎn)者
        producer.shutdown();
    }
}

消息接收方:

//接收消息
public class RocketMQReceiveTest1 {
    public static void main(String[] args) throws MQClientException {
        //創(chuàng)建消息消費(fèi)者。指定消費(fèi)者所屬的組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
        //指定Nameserver地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //指定消費(fèi)者訂閱的主題和標(biāo)簽
        consumer.subscribe("myTopic", "*");

        //設(shè)置回調(diào)函數(shù),編寫處理消息的方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("獲取到的消費(fèi)數(shù)據(jù):" + list);
                System.out.println(new String(list.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //啟動(dòng)消息消費(fèi)者
        consumer.start();
        System.out.println("Consumer Starting.");
    }
}

? ? ? ? 啟動(dòng)時(shí),切記先啟動(dòng)消費(fèi)者(接收方),再啟動(dòng)生產(chǎn)者(發(fā)送方)。

????????啟動(dòng)測(cè)試,同步發(fā)送消息結(jié)果:?

????????消息中間件-RocketMQ,rocketmq,java,java-rocketmq?

5.2 異步消息

????????異步消息通常用在對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景,即發(fā)送端不能容忍長時(shí)間地等待Broker的響應(yīng)。

發(fā)送方:

//發(fā)送異步消息
//異步消息比較浪費(fèi)性能,經(jīng)常會(huì)失敗,所以多發(fā)送幾次并且讓線程休眠幾秒
public class RocketMQSendTest2 {

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
        //1.創(chuàng)建消息生產(chǎn)者,指定生產(chǎn)者所屬的組名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        //2.設(shè)置NameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //3.啟動(dòng)生產(chǎn)者
        producer.start();
        for (int i = 0; i < 10; i++) {
            //4.創(chuàng)建消息對(duì)象,制定主題、標(biāo)簽、消息體
            Message message = new Message("myTopic", "myTag2", ("十行代碼九個(gè)錯(cuò)誤八個(gè)警告竟敢說七日精通六天學(xué)會(huì)五湖四海也不見如此三心二意之程序簡(jiǎn)直一等下流").getBytes());
            //5.發(fā)送消息
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("發(fā)送成功:" + sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    System.out.println("發(fā)送異常:" + e);
                }
            });

            //休眠
            TimeUnit.SECONDS.sleep(3);
        }

        //6.關(guān)閉生產(chǎn)者
        producer.shutdown();

    }
}

接收方:

public class RocketMQReceiveTest2 {
    public static void main(String[] args) throws MQClientException {
        //1.創(chuàng)建消息消費(fèi)者,指定消費(fèi)者所屬的組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");

        //2.指定Nameserver地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //3.指定消費(fèi)者訂閱的主題和標(biāo)簽
        consumer.subscribe("myTopic", "*");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //4.設(shè)置回調(diào)函數(shù),編寫處理請(qǐng)求的方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

                System.out.println(new String(list.get(0).getBody()));
                //返回消費(fèi)狀態(tài)
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.啟動(dòng)消息消費(fèi)者
        consumer.start();
        System.out.println("Consumer Starting.");

    }
}

啟動(dòng)測(cè)試,異步發(fā)送消息結(jié)果:?

消息中間件-RocketMQ,rocketmq,java,java-rocketmq?

5.3 單行發(fā)送消息

? ? ? 該方式用在不關(guān)注發(fā)送結(jié)果的場(chǎng)景,比如日志發(fā)送。

//單行發(fā)送消息
public class RocketMQSendTest3 {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message message = new Message("myTopic", "myTag3", ("蕪湖").getBytes());
            producer.sendOneway(message);

            TimeUnit.SECONDS.sleep(3);
        }
        producer.shutdown();
    }

}

啟動(dòng)兩個(gè)消費(fèi)者接收10條消息,結(jié)果如下:

消息中間件-RocketMQ,rocketmq,java,java-rocketmq?

消息中間件-RocketMQ,rocketmq,java,java-rocketmq?

存在消息丟失的情況。

以上就是對(duì)RocketMQ的初步認(rèn)識(shí)啦!

?

到了這里,關(guān)于消息中間件-RocketMQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 消息中間件之RocketMQ源碼分析(十)

    消息中間件之RocketMQ源碼分析(十)

    啟動(dòng)命令 nohup ./bin/mqnamesrv -c ./conf/namesrv.conf dev/null 21 通過腳本配置啟動(dòng)基本參數(shù),比如配置文件路徑、JVM參數(shù),調(diào)用NamesrvStartup.main()方法,解析命令行的參數(shù),將處理好的參數(shù)轉(zhuǎn)化為Java實(shí)例,傳遞給NamesrvController實(shí)例 加載命令行傳遞的配置參數(shù),調(diào)用controller.initialize()方法初

    2024年02月20日
    瀏覽(21)
  • 【消息中間件】RocketMQ消息重復(fù)消費(fèi)場(chǎng)景及解決辦法

    【消息中間件】RocketMQ消息重復(fù)消費(fèi)場(chǎng)景及解決辦法

    消息重復(fù)消費(fèi)是各個(gè)MQ都會(huì)發(fā)生的常見問題之一,在一些比較敏感的場(chǎng)景下,重復(fù)消費(fèi)會(huì)造成比較嚴(yán)重的后果,比如重復(fù)扣款等。 當(dāng)系統(tǒng)的調(diào)用鏈路比較長的時(shí)候,比如系統(tǒng)A調(diào)用系統(tǒng)B,系統(tǒng)B再把消息發(fā)送到RocketMQ中,在系統(tǒng)A調(diào)用系統(tǒng)B的時(shí)候,如果系統(tǒng)B處理成功,但是遲遲

    2024年02月05日
    瀏覽(41)
  • 分布式消息中間件RocketMQ的應(yīng)用

    分布式消息中間件RocketMQ的應(yīng)用

    所有代碼同步至GitCode:https://gitcode.net/ruozhuliufeng/test-rocketmq.git 普通消息 消息發(fā)送分類 ? Producer對(duì)于消息的發(fā)送方式也有多種選擇,不同的方式會(huì)產(chǎn)生不同的系統(tǒng)效果。 同步發(fā)送消息 ? 同步發(fā)送消息是指,Producer發(fā)出一條消息后,會(huì)在收到MQ返回的ACK之后才發(fā)下一條消息。

    2024年02月05日
    瀏覽(21)
  • ActiveMQ、RabbitMQ、Kafka、RocketMQ消息中間件技術(shù)選型

    消息中間件是分布式系統(tǒng)中重要的組件之一,用于實(shí)現(xiàn)異步通信、解耦系統(tǒng)、提高系統(tǒng)可靠性和擴(kuò)展性。在做消息中間件技術(shù)選型時(shí),需要考慮多個(gè)因素,包括可靠性、性能、可擴(kuò)展性、功能豐富性、社區(qū)支持和成本等。本文將五種流行的消息中間件技術(shù):ActiveMQ、RabbitMQ、

    2024年02月11日
    瀏覽(22)
  • 【消息中間件】詳解三大MQ:RabbitMQ、RocketMQ、Kafka

    【消息中間件】詳解三大MQ:RabbitMQ、RocketMQ、Kafka

    作者簡(jiǎn)介 前言 博主之前寫過一個(gè)完整的MQ系列,包含RabbitMQ、RocketMQ、Kafka,從安裝使用到底層機(jī)制、原理。專欄地址: https://blog.csdn.net/joker_zjn/category_12142400.html?spm=1001.2014.3001.5482 本文是該系列的清單綜述,會(huì)拉通來聊一下三大MQ的特點(diǎn)和各種適合的場(chǎng)景。 目錄 1.概述 1.1.M

    2024年02月09日
    瀏覽(53)
  • SpringBoot整合消息中間件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    SpringBoot整合消息中間件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    消息的發(fā)送方:生產(chǎn)者 消息的接收方:消費(fèi)者 同步消息:發(fā)送方發(fā)送消息到接收方,接收方有所回應(yīng)后才能夠進(jìn)行下一次的消息發(fā)送 異步消息:不需要接收方回應(yīng)就可以進(jìn)行下一步的發(fā)送 什么是消息隊(duì)列? 當(dāng)此時(shí)有很多個(gè)用戶同時(shí)訪問服務(wù)器,需要服務(wù)器進(jìn)行操作,但此

    2024年04月27日
    瀏覽(53)
  • 消息中間件(MQ)對(duì)比:RabbitMQ、Kafka、ActiveMQ 和 RocketMQ

    前言 在構(gòu)建分布式系統(tǒng)時(shí),選擇適合的消息中間件是至關(guān)重要的決策。RabbitMQ、Kafka、ActiveMQ 和 RocketMQ 是當(dāng)前流行的消息中間件之一,它們各自具有獨(dú)特的特點(diǎn)和適用場(chǎng)景。本文將對(duì)這四種消息中間件進(jìn)行綜合比較,幫助您在項(xiàng)目中作出明智的選擇。 1. RabbitMQ 特點(diǎn): 消息模

    2024年02月20日
    瀏覽(34)
  • 【Alibaba中間件技術(shù)系列】「RocketMQ技術(shù)專題」RocketMQ消息發(fā)送的全部流程和落盤原理分析

    RocketMQ目前在國內(nèi)應(yīng)該是比較流行的MQ 了,目前本人也在公司的項(xiàng)目中進(jìn)行使用和研究,借著這個(gè)機(jī)會(huì),分析一下RocketMQ 發(fā)送一條消息到存儲(chǔ)一條消息的過程,這樣會(huì)對(duì)以后大家分析和研究RocketMQ相關(guān)的問題有一定的幫助。 分析的總體技術(shù)范圍發(fā)送到存儲(chǔ),本文的主要目的是

    2024年02月10日
    瀏覽(26)
  • 【Java中間件】RocketMQ

    【Java中間件】RocketMQ

    Message Queue,是一種提供消息隊(duì)列服務(wù)的中間件。提供了消息生產(chǎn)、存儲(chǔ)、消費(fèi)全過程API的軟件系統(tǒng)。 MQ的作用 限流削峰:當(dāng)用戶發(fā)送超量請(qǐng)求時(shí),將請(qǐng)求暫存,以便后期慢慢處理。如果不使用MQ暫存直接請(qǐng)求到業(yè)務(wù)系統(tǒng)中容易引起系統(tǒng)崩潰。 異步解耦:若上游系統(tǒng)和下游系

    2024年02月15日
    瀏覽(23)
  • Linux系統(tǒng)下消息中間件RocketMQ下載、安裝、搭建、配置、控制臺(tái)rocketmq-dashboard的安裝保姆級(jí)教程 rocketmq ui

    Linux系統(tǒng)下消息中間件RocketMQ下載、安裝、搭建、配置、控制臺(tái)rocketmq-dashboard的安裝保姆級(jí)教程 rocketmq ui

    這里給出我使用的 RocketMQ 版本(5.1.3)、RocketMQ-Dashboard 版本的百度網(wǎng)盤鏈接: 鏈接:https://pan.baidu.com/s/1HaKBBDGWZ0WKLGgVwIG9pw 提取碼:1234 1、注意:有兩種資源下載:Source表示源碼、Binary是二進(jìn)制包(我們下載這個(gè)):二進(jìn)制包是已經(jīng)編譯完成后可以直接運(yùn)行的,源碼包是需要

    2024年02月12日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包