国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RocketMQ的windos/linux/docker超詳細(xì)安裝及簡(jiǎn)單入門!

這篇具有很好參考價(jià)值的文章主要介紹了RocketMQ的windos/linux/docker超詳細(xì)安裝及簡(jiǎn)單入門!。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

RocketMQ簡(jiǎn)單入門

本文若有不當(dāng)之處歡迎提出pr/issue

主要內(nèi)容:

  1. 初識(shí)MQ

  2. RocketMQ簡(jiǎn)介

  3. RocketMQ安裝

  4. RocketMQ快速入門

  5. SpringBoot集成RocketMQ

  6. 最后

1.初識(shí)MQ

1.1.同步和異步通訊

微服務(wù)間通訊有同步和異步兩種方式:

同步通訊:就像打電話,需要實(shí)時(shí)響應(yīng)。

異步通訊:就像發(fā)郵件,不需要馬上回復(fù)。

兩種方式各有優(yōu)劣,打電話可以立即得到響應(yīng),但是你卻不能跟多個(gè)人同時(shí)通話。發(fā)送郵件可以同時(shí)與多個(gè)人收發(fā)郵件,但是往往響應(yīng)會(huì)有延遲。

1.1.1.同步通訊

Feign調(diào)用就屬于同步方式,雖然調(diào)用可以實(shí)時(shí)得到結(jié)果,但存在下面的問(wèn)題:

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

總結(jié):

同步調(diào)用的優(yōu)點(diǎn):

  • 時(shí)效性較強(qiáng),可以立即得到結(jié)果

同步調(diào)用的問(wèn)題:

  • 耦合度高
  • 性能和吞吐能力下降
  • 有額外的資源消耗
  • 有級(jí)聯(lián)失敗問(wèn)題(由于一個(gè)故障導(dǎo)致了連鎖反應(yīng),使得系統(tǒng)中的其他組件或節(jié)點(diǎn)也相繼失?。?/li>
1.1.2.異步通訊

異步調(diào)用則可以避免上述問(wèn)題:

我們以購(gòu)買商品為例,用戶支付后需要調(diào)用訂單服務(wù)完成訂單狀態(tài)修改,調(diào)用物流服務(wù),從倉(cāng)庫(kù)分配響應(yīng)的庫(kù)存并準(zhǔn)備發(fā)貨。

在事件模式中,支付服務(wù)是事件發(fā)布者(publisher),在支付完成后只需要發(fā)布一個(gè)支付成功的事件(event),事件中帶上訂單id。

訂單服務(wù)和物流服務(wù)是事件訂閱者(Consumer),訂閱支付成功的事件,監(jiān)聽(tīng)到事件后完成自己業(yè)務(wù)即可。

為了解除事件發(fā)布者與訂閱者之間的耦合,兩者并不是直接通信,而是有一個(gè)中間人(Broker)。發(fā)布者發(fā)布事件到Broker,不關(guān)心誰(shuí)來(lái)訂閱事件。訂閱者從Broker訂閱事件,不關(guān)心誰(shuí)發(fā)來(lái)的消息。

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

Broker 是一個(gè)像數(shù)據(jù)總線一樣的東西,所有的服務(wù)要接收數(shù)據(jù)和發(fā)送數(shù)據(jù)都發(fā)到這個(gè)總線上,這個(gè)總線就像協(xié)議一樣,讓服務(wù)間的通訊變得標(biāo)準(zhǔn)和可控。

好處:

  • 吞吐量提升:無(wú)需等待訂閱者處理完成,響應(yīng)更快速

  • 故障隔離:服務(wù)沒(méi)有直接調(diào)用,不存在級(jí)聯(lián)失敗問(wèn)題

  • 調(diào)用間沒(méi)有阻塞,不會(huì)造成無(wú)效的資源占用

  • 耦合度極低,每個(gè)服務(wù)都可以靈活插拔,可替換

  • 流量削峰:不管發(fā)布事件的流量波動(dòng)多大,都由Broker接收,訂閱者可以按照自己的速度去處理事件

缺點(diǎn):

  • 架構(gòu)復(fù)雜了,業(yè)務(wù)沒(méi)有明顯的流程線,不好管理
  • 需要依賴于Broker的可靠、安全、性能

好在現(xiàn)在開(kāi)源軟件或云平臺(tái)上 Broker 的軟件是非常成熟的,比較常見(jiàn)的一種就是我們今天要學(xué)習(xí)的MQ技術(shù)。

1.2.技術(shù)對(duì)比:

MQ,中文是消息隊(duì)列(MessageQueue),字面來(lái)看就是存放消息的隊(duì)列。也就是事件驅(qū)動(dòng)架構(gòu)中的Broker。

幾種常見(jiàn)MQ的對(duì)比:

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社區(qū) Rabbit Apache 阿里 Apache
開(kāi)發(fā)語(yǔ)言 Erlang Java Java Scala&Java
協(xié)議支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定義協(xié)議 自定義協(xié)議
可用性 一般
單機(jī)吞吐量 一般 非常高
消息延遲 微秒級(jí) 毫秒級(jí) 毫秒級(jí) 毫秒以內(nèi)
消息可靠性 一般 一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延遲:RabbitMQ、Kafka

不同的消息隊(duì)列系統(tǒng)在不同場(chǎng)景下有各自的優(yōu)勢(shì)和適用性。以下是各個(gè)消息隊(duì)列系統(tǒng)在不同場(chǎng)合下的最佳選擇:

  1. Kafka:
    • 最佳場(chǎng)合:大規(guī)模數(shù)據(jù)處理、實(shí)時(shí)日志收集和分析、流式處理。
    • 優(yōu)勢(shì):高吞吐量、低延遲、水平擴(kuò)展能力強(qiáng)、長(zhǎng)期消息存儲(chǔ),適合構(gòu)建大規(guī)模的實(shí)時(shí)數(shù)據(jù)流處理平臺(tái),如實(shí)時(shí)日志收集和分析、事件流處理等。
  2. RabbitMQ:
    • 最佳場(chǎng)合:傳統(tǒng)的企業(yè)級(jí)應(yīng)用、輕量級(jí)的消息傳遞場(chǎng)景。
    • 優(yōu)勢(shì):簡(jiǎn)單易用、支持多種消息協(xié)議、適合點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱模式,對(duì)于傳統(tǒng)的企業(yè)應(yīng)用和中小規(guī)模的消息傳遞需求,是一種可靠的選擇。
  3. ActiveMQ:
    • 最佳場(chǎng)合:中小規(guī)模的企業(yè)應(yīng)用、Java生態(tài)系統(tǒng)中的集成需求。
    • 優(yōu)勢(shì):Java開(kāi)發(fā)環(huán)境友好、支持多種消息協(xié)議,適合與Java生態(tài)系統(tǒng)的其他組件集成,如Spring框架等。
  4. RocketMQ:
    • 最佳場(chǎng)合:大規(guī)模的分布式系統(tǒng)、互聯(lián)網(wǎng)應(yīng)用、金融領(lǐng)域的消息處理。
    • 優(yōu)勢(shì):高吞吐量、低延遲、豐富的消息存儲(chǔ)模式,適用于處理大規(guī)模的消息傳遞場(chǎng)景,特別是在互聯(lián)網(wǎng)和金融領(lǐng)域。

綜合考慮以上因素,可以做如下簡(jiǎn)單總結(jié)

  • 如果需要處理大規(guī)模的實(shí)時(shí)數(shù)據(jù)流、日志收集和分析等高吞吐量場(chǎng)景,首選Kafka。
  • 如果對(duì)于消息傳遞的簡(jiǎn)單性和易用性有較高要求,適合中小規(guī)模的企業(yè)應(yīng)用和輕量級(jí)消息傳遞需求,可以選擇RabbitMQActiveMQ。
  • 如果在大規(guī)模的分布式系統(tǒng)、互聯(lián)網(wǎng)應(yīng)用或金融領(lǐng)域需要處理消息傳遞,RocketMQ是一個(gè)較好的選擇。

2.RocketMQ簡(jiǎn)介

官網(wǎng): http://rocketmq.apache.org/

RocketMQ是阿里巴巴2016年MQ中間件,使用Java語(yǔ)言開(kāi)發(fā),RocketMQ 是一款開(kāi)源的分布式消息系統(tǒng),基于高可用分布式集群技術(shù),提供低延時(shí)的、高可靠的消息發(fā)布與訂閱服務(wù)。同時(shí),廣泛應(yīng)用于多個(gè)領(lǐng)域,包括異步通信解耦、企業(yè)解決方案、金融支付、電信、電子商務(wù)、快遞物流、廣告營(yíng)銷、社交、即時(shí)通信、移動(dòng)應(yīng)用、手游、視頻、物聯(lián)網(wǎng)、車聯(lián)網(wǎng)等。

RocketMQ的設(shè)計(jì)目標(biāo)是支持大規(guī)模消息處理,具有高并發(fā)、高可用和容錯(cuò)能力。它在多個(gè)方面提供了強(qiáng)大的功能和特性:

  1. 分布式架構(gòu):RocketMQ采用分布式架構(gòu),支持在多個(gè)節(jié)點(diǎn)之間進(jìn)行消息的發(fā)送和接收,實(shí)現(xiàn)了水平擴(kuò)展能力。
  2. 高吞吐量:RocketMQ可以在大規(guī)模并發(fā)場(chǎng)景下實(shí)現(xiàn)高吞吐量的消息處理,適用于高并發(fā)的業(yè)務(wù)場(chǎng)景。
  3. 低延遲:RocketMQ具有較低的消息傳遞延遲,適用于需要實(shí)時(shí)性的應(yīng)用場(chǎng)景。
  4. 消息可靠性:RocketMQ提供了多種消息存儲(chǔ)模式,可以確保消息的可靠傳遞,包括同步刷盤和異步刷盤等方式。
  5. 消息順序性:RocketMQ支持消息的順序傳遞,可以確保同一消息隊(duì)列中的消息按照發(fā)送順序被消費(fèi)。
  6. 支持多種消息模式:RocketMQ支持發(fā)布/訂閱模式和點(diǎn)對(duì)點(diǎn)模式,可以根據(jù)業(yè)務(wù)需求選擇合適的消息模式。
  7. 靈活的部署方式:RocketMQ支持多種部署方式,可以在單機(jī)上運(yùn)行,也可以搭建集群部署。
  8. 豐富的監(jiān)控和管理工具:RocketMQ提供了豐富的監(jiān)控和管理工具,方便管理員對(duì)消息隊(duì)列進(jìn)行監(jiān)控和管理。

核心概念

Producer:消息的發(fā)送者,生產(chǎn)者;舉例:發(fā)件人。

Consumer:消息接收者,消費(fèi)者;舉例:收件人。

Broker:消息隊(duì)列的中間服務(wù)器,負(fù)責(zé)存儲(chǔ)消息并將消息傳遞給消費(fèi)者;舉例:快遞。

NameServer:可以理解為是一個(gè)注冊(cè)中心,主要是用來(lái)保存topic路由信息,管理Broker。在NameServer的集群中,NameServer與NameServer之間是沒(méi)有任何通信的;舉例:各個(gè)快遞公司的管理機(jī)構(gòu)相當(dāng)于broker的注冊(cè)中心,保留了broker的信息。

Queue:隊(duì)列,消息存放的位置,一個(gè)Broker中可以有多個(gè)隊(duì)列。

Topic:消息的邏輯分類,生產(chǎn)者發(fā)送消息到指定的Topic,消費(fèi)者從指定的Topic訂閱消息。一個(gè)Topic可以有多個(gè)Producer和多個(gè)Consumer。

ProducerGroup:生產(chǎn)者組 。

ConsumerGroup:消費(fèi)者組,多個(gè)消費(fèi)者組可以同時(shí)消費(fèi)一個(gè)主題的消息。

工作流程

該部分轉(zhuǎn)載自 [掘金文章](RocketMQ保姆級(jí)教程 - 掘金 (juejin.cn))

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

通過(guò)這張圖就可以很清楚的知道,RocketMQ大致的工作流程:

  • Broker啟動(dòng)的時(shí)候,會(huì)往每臺(tái)NameServer(因?yàn)镹ameServer之間不通信,所以每臺(tái)都得注冊(cè))注冊(cè)自己的信息,這些信息包括自己的ip和端口號(hào),自己這臺(tái)Broker有哪些topic等信息。

  • Producer在啟動(dòng)之后會(huì)跟會(huì)NameServer建立連接,定期從NameServer中獲取Broker的信息,當(dāng)發(fā)送消息的時(shí)候,會(huì)根據(jù)消息需要發(fā)送到哪個(gè)topic去找對(duì)應(yīng)的Broker地址,如果有的話,就向這臺(tái)Broker發(fā)送請(qǐng)求;沒(méi)有找到的話,就看根據(jù)是否允許自動(dòng)創(chuàng)建topic來(lái)決定是否發(fā)送消息。

  • Broker在接收到Producer的消息之后,會(huì)將消息存起來(lái),持久化,如果有從節(jié)點(diǎn)的話,也會(huì)主動(dòng)同步給從節(jié)點(diǎn),實(shí)現(xiàn)數(shù)據(jù)的備份

  • Consumer啟動(dòng)之后也會(huì)跟會(huì)NameServer建立連接,定期從NameServer中獲取Broker和對(duì)應(yīng)topic的信息,然后根據(jù)自己需要訂閱的topic信息找到對(duì)應(yīng)的Broker的地址,然后跟Broker建立連接,獲取消息,進(jìn)行消費(fèi)

3.RocketMQ安裝

本文檔所涉及的是單機(jī)版的RocketMQ安裝教程,能夠滿足基本的學(xué)習(xí)使用,屬于入門級(jí)的教程,如果想要搭集群部署,可以參考其他資料,進(jìn)行配置即可

3.1.Windos下的安裝

所需環(huán)境

Windows 64位系統(tǒng)
JDK1.8(64位)
Maven

進(jìn)入[RocketMQ官網(wǎng)下載](下載 | RocketMQ (apache.org))

1、選擇Binary 下載

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

2、將壓縮包解壓至自定路徑

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

3、配置系統(tǒng)中的環(huán)境變量

變量名:ROCKETMQ_HOME

變量值:(如圖瀏覽目錄選擇指定bin-release文件夾路徑)

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

4.啟動(dòng)RocketMQ

在自己安裝的RocketMQ的bin目錄下執(zhí)行cmd命令,輸入下面命令,啟動(dòng)NameServer

start mqnamesrv.cmd

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

若出現(xiàn)如上圖所示的命令框,說(shuō)明啟動(dòng)成功,保留窗口切勿關(guān)閉

繼續(xù)啟動(dòng)broker

與上述同樣的路徑下呼出cmd,執(zhí)行如下命令:

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable = true

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

看到上述命令框彈出即完成對(duì)RocketMQ的啟動(dòng)。

注意:

RocketMQ默認(rèn)的虛擬機(jī)內(nèi)存較大,啟動(dòng)如果因?yàn)閮?nèi)存不足報(bào)錯(cuò)可執(zhí)行以下步驟:

用記事本打開(kāi)bin目錄下的 runbroker.cmd

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

  1. -Xms2g:設(shè)置JVM初始堆內(nèi)存大小為2GB。
  2. -Xmx2g:設(shè)置JVM最大堆內(nèi)存大小為2GB。

可修改為 -Xms256m -Xmx256m -Xmn128m

同理打開(kāi)runserver.cmd

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

修改jvm參數(shù)為

-Xms256m -Xmx256m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m

5.配置可視化頁(yè)面

下載可視化插件源碼

github下載地址:https://github.com/apache/rocketmq-dashboard

復(fù)制下載鏈接后使用git下載

可自建文件夾,進(jìn)入后使用git bash下載

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

git clone https://github.com/apache/rocketmq-dashboard.git

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

下載完成后,進(jìn)入application.yml中查看配置

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

保存后進(jìn)入到 …/rocketmq-dashboard目錄下,鼠標(biāo)右鍵進(jìn)入git控制臺(tái)

執(zhí)行 mvn clean package -Dmaven.test.skip=true

將該文件打包成jar包,該jar包保存在 該目錄的 target子目錄下

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

打包完成!

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

在 target子目錄下可找到對(duì)應(yīng)的jar包

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

在該目錄下打開(kāi)cmd,輸入指令==(請(qǐng)保證已經(jīng)運(yùn)行NameServer和broker)==:

java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

成功執(zhí)行jar包

然后在網(wǎng)頁(yè)中訪問(wèn) http://127.0.0.1:8080/#/ 即可進(jìn)入rocketmq的圖形化界面

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

3.2.Linux下的安裝

請(qǐng)?zhí)崆霸O(shè)置服務(wù)器的防火墻,放通9876和10909(默認(rèn)的 RocketMQ Broker 端口號(hào))端口

進(jìn)入[RocketMQ官網(wǎng)下載](下載 | RocketMQ (apache.org))

1、選擇Binary 下載

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

2、在linux中創(chuàng)建RocketMQ文件夾

mkdir RocketMQ

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

3、將rocketmq-all-5.1.2-bin-release.zip壓縮文件上傳到linux服務(wù)器中

連接工具XSHELL - NetSarang Website

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

將壓縮包上傳到第2步創(chuàng)建的文件中

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

4、解壓zip包

cd ./RocketMQ/

unzip rocketmq-all-5.1.2-bin-release.zip

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

如果你的服務(wù)器沒(méi)有unzip命令,則下載安裝一個(gè)

yum install unzip

5、配置環(huán)境變量

vim /etc/profile

在文件末尾添加

export NAMESRV_ADDR=服務(wù)器IP:9876

6、修改腳本文件

修改目錄/root/RocketMQ/rocketmq-all-5.1.2-bin-release/bin下的配置文件: runserver.sh、runbroker.sh

修改runserver.sh 中原有內(nèi)存配置

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

修改runbroker.sh 中原有內(nèi)存配置

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

修改目錄/root/RocketMQ/rocketmq-all-5.1.2-bin-release/conf/broker.conf文件

在最后添加上

namesrvAddr = (服務(wù)器ip):9876
autoCreateTopicEnable=true
brokerIP1 = (服務(wù)器ip)

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

7、啟動(dòng)

進(jìn)入/root/RocketMQ/rocketmq-all-5.1.2-bin-release

首先在安裝目錄下創(chuàng)建一個(gè)logs文件夾,用于存放日志

mkdir logs

運(yùn)行兩條命令,啟動(dòng)NameServer和broker

nohup sh bin/mqnamesrv > ./logs/namesrv.log &

nohup sh bin/mqbroker -c conf/broker.conf > ./logs/broker.log &

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

運(yùn)行后可在logs文件夾下看到兩個(gè)日志文件

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

8.配置可視化頁(yè)面

前置步驟參考windows下的第5步5.配置可視化頁(yè)面

將jar包上傳到服務(wù)器的/root/RocketMQ中

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

然后在RockerMQ中運(yùn)行指令:

nohup java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar rocketmq.config.namesrvAddr=127.0.0.1:9876

命令拓展:–server.port指定運(yùn)行的端口

–rocketmq.config.namesrvAddr=127.0.0.1:9876 指定namesrv地址

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

成功運(yùn)行!

最后訪問(wèn) 服務(wù)器ip:8080 即可訪問(wèn)到圖形化界面

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

3.3.docker安裝(推薦)
1、下載RockerMQ需要的鏡像

docker pull rocketmqinc/rocketmq

docker pull styletang/rocketmq-console-ng

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

2、啟動(dòng)NameServer服務(wù)

創(chuàng)建NameServer數(shù)據(jù)存儲(chǔ)路徑

mkdir -p /home/rocketmq/data/namesrv/logs /home/rocketmq/data/namesrv/store

啟動(dòng)NameServer容器

docker run -d --name rmqnamesrv -p 9876:9876 -v /home/rocketmq/data/namesrv/logs:/root/logs -v /home/rocketmq/data/namesrv/store:/root/store -e “MAX_POSSIBLE_HEAP=100000000” rocketmqinc/rocketmq sh mqnamesrv

這是一個(gè)Docker命令,用于在Docker容器中運(yùn)行RocketMQ Name Server(消息服務(wù)器)。讓我們逐步解釋這個(gè)命令:

bashCopy codedocker run -d \
--name rmqnamesrv \
-p 9876:9876 \
-v /home/rocketmq/data/namesrv/logs:/root/logs \
-v /home/rocketmq/data/namesrv/store:/root/store \
-e "MAX_POSSIBLE_HEAP=100000000" \
rocketmqinc/rocketmq sh mqnamesrv

解釋:

  • docker run: 這是Docker命令的基本部分,用于運(yùn)行一個(gè)新的容器。
  • -d: 這是一個(gè)選項(xiàng),表示在后臺(tái)(detached mode)運(yùn)行容器。
  • --name rmqnamesrv: 這是為容器指定一個(gè)名稱,該名稱為"rmqnamesrv"。
  • -p 9876:9876: 這是端口映射的選項(xiàng),將主機(jī)的端口9876映射到容器的端口9876。RocketMQ的Name Server默認(rèn)監(jiān)聽(tīng)端口是9876,通過(guò)這個(gè)映射,可以從主機(jī)的9876端口訪問(wèn)容器中運(yùn)行的RocketMQ Name Server。
  • -v /home/rocketmq/data/namesrv/logs:/root/logs: 這是用于將主機(jī)的/home/rocketmq/data/namesrv/logs目錄映射到容器內(nèi)的/root/logs目錄。這樣做的目的是將RocketMQ Name Server的日志文件存儲(chǔ)在主機(jī)的目錄中,方便查看和管理。
  • -v /home/rocketmq/data/namesrv/store:/root/store: 這是用于將主機(jī)的/home/rocketmq/data/namesrv/store目錄映射到容器內(nèi)的/root/store目錄。這樣做的目的是將RocketMQ Name Server的存儲(chǔ)文件存儲(chǔ)在主機(jī)的目錄中。
  • -e "MAX_POSSIBLE_HEAP=100000000": 這是用于設(shè)置環(huán)境變量的選項(xiàng),設(shè)置了RocketMQ Name Server的最大堆內(nèi)存大小為100,000,000字節(jié),約為100MB。
  • rocketmqinc/rocketmq: 這是指定要運(yùn)行的Docker鏡像的名稱。在這里,它使用了RocketMQ官方提供的Docker鏡像,名為rocketmqinc/rocketmq。
  • sh mqnamesrv: 這是在容器中要運(yùn)行的命令。在這里,它運(yùn)行了RocketMQ Name Server的啟動(dòng)命令。
3、啟動(dòng)Broker服務(wù)

創(chuàng)建Broker數(shù)據(jù)存儲(chǔ)路徑

mkdir -p /home/rocketmq/data/broker/logs /home/rocketmq/data/broker/store

創(chuàng)建conf配置文件目錄

mkdir /home/rocketmq/conf

在配置文件目錄下創(chuàng)建broker.conf配置文件

# 所屬集群名稱,如果節(jié)點(diǎn)較多可以配置多個(gè)
brokerClusterName = DefaultCluster
#broker名稱,master和slave使用相同的名稱,表明他們的主從關(guān)系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示幾點(diǎn)做消息刪除動(dòng)作,默認(rèn)是凌晨4點(diǎn)
deleteWhen = 04
#在磁盤上保留消息的時(shí)長(zhǎng),單位是小時(shí)
fileReservedTime = 48
#有三個(gè)值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和異步表示Master和Slave之間同步數(shù)據(jù)的機(jī)制;
brokerRole = ASYNC_MASTER
#刷盤策略,取值為:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盤和異步刷盤;SYNC_FLUSH消息寫(xiě)入磁盤后才返回成功狀態(tài),ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 設(shè)置broker節(jié)點(diǎn)所在服務(wù)器的ip地址
autoCreateTopicEnable=true
brokerIP1 = 你服務(wù)器外網(wǎng)ip

啟動(dòng)Broker容器 (注意先開(kāi)放10911和10909端口)

docker run -d --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 -v /home/rocketmq/data/broker/logs:/root/logs -v /home/rocketmq/data/broker/store:/root/store -v /home/rocketmq/conf/broker.conf:/opt/rocketmq/conf/broker.conf --privileged=true -e “NAMESRV_ADDR=namesrv:9876” -e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq/conf/broker.conf

解釋:

  • docker run: 這是Docker命令的基本部分,用于運(yùn)行一個(gè)新的容器。
  • -d: 這是一個(gè)選項(xiàng),表示在后臺(tái)(detached mode)運(yùn)行容器。
  • --name rmqbroker: 這是為容器指定一個(gè)名稱,該名稱為"rmqbroker"。
  • --link rmqnamesrv:namesrv: 這是用于將已經(jīng)運(yùn)行的RocketMQ Name Server容器 “rmqnamesrv” 鏈接到當(dāng)前運(yùn)行的Broker容器。這樣Broker容器就可以通過(guò)"namesrv"主機(jī)名訪問(wèn)Name Server。
  • -p 10911:10911: 這是端口映射的選項(xiàng),將主機(jī)的端口10911映射到容器的端口10911。RocketMQ的Broker默認(rèn)監(jiān)聽(tīng)端口是10911,通過(guò)這個(gè)映射,可以從主機(jī)的10911端口訪問(wèn)容器中運(yùn)行的RocketMQ Broker。
  • -p 10909:10909: 同上,將主機(jī)的端口10909映射到容器的端口10909。RocketMQ的Broker默認(rèn)監(jiān)聽(tīng)的另一個(gè)端口是10909,該端口用于向主節(jié)點(diǎn)發(fā)送心跳。
  • -v /home/rocketmq/data/broker/logs:/root/logs: 這是用于將主機(jī)的/home/rocketmq/data/broker/logs目錄映射到容器內(nèi)的/root/logs目錄。這樣做的目的是將RocketMQ Broker的日志文件存儲(chǔ)在主機(jī)的目錄中,方便查看和管理。
  • -v /home/rocketmq/data/broker/store:/root/store: 這是用于將主機(jī)的/home/rocketmq/data/broker/store目錄映射到容器內(nèi)的/root/store目錄。這樣做的目的是將RocketMQ Broker的存儲(chǔ)文件存儲(chǔ)在主機(jī)的目錄中。
  • -v /home/rocketmq/conf/broker.conf:/opt/rocketmq/conf/broker.conf: 這是用于將主機(jī)的/home/rocketmq/conf/broker.conf文件映射到容器內(nèi)的/opt/rocketmq/conf/broker.conf文件。這個(gè)文件是RocketMQ Broker的配置文件,通過(guò)這個(gè)映射,可以將自定義的Broker配置應(yīng)用到容器中。
  • --privileged=true: 這是為容器添加特權(quán)模式,這樣容器就可以獲得更高的權(quán)限。
  • -e "NAMESRV_ADDR=namesrv:9876": 這是用于設(shè)置環(huán)境變量的選項(xiàng),設(shè)置了RocketMQ Broker的Name Server地址為"namesrv:9876"。NAMESRV_ADDR是RocketMQ Broker連接Name Server的地址,這里設(shè)置為"namesrv:9876"表示通過(guò)名為"namesrv"的容器連接Name Server。
  • -e "MAX_POSSIBLE_HEAP=200000000": 這是用于設(shè)置環(huán)境變量的選項(xiàng),設(shè)置了RocketMQ Broker的最大堆內(nèi)存大小為200,000,000字節(jié),約為200MB。
  • rocketmqinc/rocketmq: 這是指定要運(yùn)行的Docker鏡像的名稱。在這里,它使用了RocketMQ官方提供的Docker鏡像,名為rocketmqinc/rocketmq。
  • sh mqbroker -c /opt/rocketmq/conf/broker.conf: 這是在容器中要運(yùn)行的命令。在這里,它運(yùn)行了RocketMQ Broker的啟動(dòng)命令,通過(guò)-c參數(shù)指定了配置文件的路徑。

啟動(dòng)控制臺(tái) (注意先開(kāi)放9999端口)

docker run -d --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=服務(wù)器的ip:9876 \

-Dcom.rocketmq.sendMessageWithVIPChannel=false \

-Duser.timezone=‘Asia/Shanghai’" -v /etc/localtime:/etc/localtime -p 9999:8080 styletang/rocketmq-console-ng

解釋:

  • docker run: 這是Docker命令的基本部分,用于運(yùn)行一個(gè)新的容器。
  • -d: 這是一個(gè)選項(xiàng),表示在后臺(tái)(detached mode)運(yùn)行容器。
  • --name rmqadmin: 這是為容器指定一個(gè)名稱,該名稱為"rmqadmin"。
  • -e "JAVA_OPTS=...": 這是用于設(shè)置Java虛擬機(jī)(JVM)運(yùn)行時(shí)的參數(shù)。在這里,它設(shè)置了三個(gè)參數(shù):
    • -Drocketmq.namesrv.addr=服務(wù)器的ip:9876:這是用于設(shè)置RocketMQ Name Server的地址。您需要將"服務(wù)器的ip"替換為實(shí)際的RocketMQ Name Server的IP地址,端口為9876。
    • -Dcom.rocketmq.sendMessageWithVIPChannel=false:這是用于設(shè)置RocketMQ消息發(fā)送時(shí)是否啟用VIP通道的參數(shù),將其設(shè)置為false表示禁用VIP通道。
    • -Duser.timezone='Asia/Shanghai':這是用于設(shè)置容器時(shí)區(qū)的參數(shù),將其設(shè)置為’Asia/Shanghai’表示使用上海時(shí)區(qū)。
  • -v /etc/localtime:/etc/localtime: 這是用于將主機(jī)的時(shí)區(qū)配置映射到容器內(nèi),保持容器與主機(jī)的時(shí)區(qū)一致。
  • -p 9999:8080: 這是端口映射的選項(xiàng),將主機(jī)的端口9999映射到容器的端口8080。RocketMQ控制臺(tái)使用8080端口,通過(guò)這個(gè)映射,可以從主機(jī)的9999端口訪問(wèn)容器中運(yùn)行的RocketMQ控制臺(tái)。
  • styletang/rocketmq-console-ng: 這是指定要運(yùn)行的Docker鏡像的名稱。在這里,它使用了RocketMQ控制臺(tái)NG的Docker鏡像,名為styletang/rocketmq-console-ng。

正常啟動(dòng)后的docker ps

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

4、訪問(wèn)控制臺(tái)

http://你的服務(wù)器外網(wǎng)ip:9999/

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

4.RocketMQ快速入門

4.x文檔 下文基于該文檔

5.0文檔

通過(guò)該部分可以快速入門RocketMQ提供的多種發(fā)送消息的模式,例如同步消息,異步消息,順序消息,延遲消息,事務(wù)消息等

消息發(fā)送和監(jiān)聽(tīng)的流程

消息生產(chǎn)者

1.創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名

2.指定NameServer地址

3.啟動(dòng)producer

4.創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體等

5.發(fā)送消息

6.關(guān)閉生產(chǎn)者producer

消息消費(fèi)者

1.創(chuàng)建消費(fèi)者consumer,制定消費(fèi)者組名

2.指定NameServer地址

3.創(chuàng)建監(jiān)聽(tīng)訂閱主題TopicTag

4.處理消息

5.啟動(dòng)消費(fèi)者consumer

了解了消息發(fā)送和監(jiān)聽(tīng)的流程,我們可以開(kāi)始簡(jiǎn)單的代碼實(shí)現(xiàn)

Start

創(chuàng)建一個(gè)空項(xiàng)目 RocketMQ-study

在空項(xiàng)目下創(chuàng)建一個(gè)新模板:

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

簡(jiǎn)單測(cè)試

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

引入依賴:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--原生api-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>5.1.2</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

測(cè)試一個(gè)簡(jiǎn)單的生產(chǎn)方法和消費(fèi)方法:

package com.wp.rocketmqdemo01.constant;

public interface MqConstant {
    /**
     * 生產(chǎn)者組名
     */
    String PRODUCER_GROUP = "test-producer-group";
    /**
     * 消費(fèi)者組名
     */
    String CONSUMER_GROUP = "test-consumer-group";
    /**
     * 主題
     */
    String TOPIC = "test-topic";
    /**
     * NameServer地址
     */
    String NAME_SRV_ADDR = "ip:9876";
}
package com.wp.rocketmqdemo01.demo;

import com.wp.rocketmqdemo01.constant.MqConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Test;

import java.util.List;

@Slf4j
public class SimpleTest01 {

    /**
     * 生產(chǎn)者
     * @throws Exception
     */
    @Test
    public void SimpleTestProducer() throws Exception {
        // 創(chuàng)建一個(gè)生產(chǎn)者(制定一個(gè)組名
        DefaultMQProducer producer = new DefaultMQProducer(MqConstant.PRODUCER_GROUP);
        // 指定NameServer地址,連接到NameServer
        producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        // 啟動(dòng)生產(chǎn)者
        producer.start();
        // 創(chuàng)建一個(gè)消息
        Message message = new Message(MqConstant.TOPIC, MqConstant.TAG, "Hello RocketMQ".getBytes());
        // 發(fā)送消息
        SendResult sendResult = producer.send(message);
        log.info("消息發(fā)送結(jié)果:{}", sendResult);
        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
    }

    /**
     * 消費(fèi)者
     * @throws Exception
     */
    @Test
    public void SimpleTestConsumer() throws Exception {
        // 創(chuàng)建一個(gè)消費(fèi)者(制定一個(gè)組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MqConstant.CONSUMER_GROUP);
        // 指定NameServer地址,連接到NameServer
        consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        // 訂閱主題 *表示訂閱所有
        consumer.subscribe(MqConstant.TOPIC, "*");
        // 注冊(cè)消息監(jiān)聽(tīng)器(一直監(jiān)聽(tīng),異步回調(diào)方法)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 這個(gè)就是消費(fèi)的方法 業(yè)務(wù)邏輯
                log.info("我是消費(fèi)者,我正在消費(fèi)消息");
                log.info(list.get(0).toString());
                for(MessageExt messageExt : list) {
                    log.info("消費(fèi)消息:{}", new String(messageExt.getBody()));
                }
                log.info("消息上下文:{}", consumeConcurrentlyContext);
                // 返回消費(fèi)狀態(tài)
                // 如果消費(fèi)成功,返回CONSUME_SUCCESS,消息會(huì)被消費(fèi)掉,從mq出隊(duì)
                // 如果消費(fèi)失敗,返回RECONSUME_LATER,消息會(huì)重新投遞,mq不會(huì)出隊(duì)
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動(dòng)消費(fèi)者 這個(gè)start一定要寫(xiě)在registerMessageListener下面
        consumer.start();
        // 保證消費(fèi)者不退出,掛起當(dāng)前jvm
        System.in.read();
    }
}

分別測(cè)試后得到如下結(jié)果:

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

  1. Broker Offset(代理器偏移量): Broker Offset是指消息隊(duì)列中的消息在Broker(消息代理器)上的偏移位置。當(dāng)生產(chǎn)者將消息發(fā)送到Broker時(shí),每條消息都會(huì)被賦予一個(gè)唯一的偏移量,表示該消息在隊(duì)列中的位置。Broker Offset主要由消息代理器維護(hù)和管理,用于追蹤消息的存儲(chǔ)和處理情況。
  2. Consumer Offset(消費(fèi)者偏移量): Consumer Offset是指消費(fèi)者在消費(fèi)消息時(shí)的位置偏移。當(dāng)消費(fèi)者成功消費(fèi)了一條消息后,會(huì)將自己的消費(fèi)偏移量記錄下來(lái),表示下次繼續(xù)消費(fèi)消息的起始位置。消費(fèi)者需要定期更新Consumer Offset,以保證消息處理的準(zhǔn)確性和可靠性。
  3. Diff Total(差異總數(shù)): Diff Total是Broker Offset和Consumer Offset之間的差異總和。也就是說(shuō),Diff Total表示消息隊(duì)列中已經(jīng)被生產(chǎn)者發(fā)送并存儲(chǔ)在Broker上的消息數(shù)量,但尚未被消費(fèi)者消費(fèi)的消息數(shù)量。Diff Total可以用來(lái)監(jiān)控消息隊(duì)列的堆積情況,幫助發(fā)現(xiàn)消息處理速度跟不上消息產(chǎn)生速度的問(wèn)題。

最簡(jiǎn)單的測(cè)試完成!

MQ的消費(fèi)模式

可以大致分為兩種:推(Push)模式和拉(Pull)模式

  1. 推(Push)模式: 在推模式中,消息隊(duì)列將消息直接推送給消費(fèi)者。一旦有新的消息產(chǎn)生并發(fā)送到隊(duì)列中,隊(duì)列會(huì)立即將該消息推送給已注冊(cè)的消費(fèi)者。這樣消費(fèi)者就可以及時(shí)收到并處理消息。推模式適用于需要實(shí)時(shí)響應(yīng)和高實(shí)時(shí)性的場(chǎng)景,比如即時(shí)通訊、實(shí)時(shí)推送等。
  2. 拉(Pull)模式: 在拉模式中,消費(fèi)者需要主動(dòng)從消息隊(duì)列中拉取消息。消費(fèi)者需要周期性地向隊(duì)列發(fā)起請(qǐng)求,查詢是否有新的消息可供消費(fèi)。如果隊(duì)列中有新消息,隊(duì)列會(huì)將這些消息返回給消費(fèi)者,然后消費(fèi)者再對(duì)這些消息進(jìn)行處理。拉模式適用于不需要實(shí)時(shí)響應(yīng)的場(chǎng)景,比如批量處理、數(shù)據(jù)同步等。

每種消費(fèi)模式都有其適用的場(chǎng)景和優(yōu)缺點(diǎn)。推模式能夠及時(shí)將消息推送給消費(fèi)者,實(shí)現(xiàn)了實(shí)時(shí)性和低延遲,但在高并發(fā)場(chǎng)景下可能會(huì)產(chǎn)生大量推送請(qǐng)求,增加系統(tǒng)壓力。而拉模式需要消費(fèi)者主動(dòng)輪詢消息隊(duì)列,可以控制消費(fèi)的速度,但可能會(huì)導(dǎo)致消息處理不及時(shí),影響系統(tǒng)的實(shí)時(shí)性。

發(fā)送同步消息

上面的快速入門就是發(fā)送同步消息,發(fā)送過(guò)后就會(huì)有一個(gè)返回值(SEND_OK),也就是mq服務(wù)器接收到消息后返回的一個(gè)確認(rèn),這種方式非常安全,但是性能上并沒(méi)有這么高,而且在mq集群中,也是要等到所有的從機(jī)都復(fù)制了消息以后才會(huì)返回,所以針對(duì)重要的消息可以選擇這種方式

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

發(fā)送異步消息

異步消息通常用在對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景,即發(fā)送端不能容忍長(zhǎng)時(shí)間地等待Broker的響應(yīng)。發(fā)送完以后會(huì)有一個(gè)異步消息通知

    @Test
    public void syncProducer() throws Exception {
        // 創(chuàng)建默認(rèn)的生產(chǎn)者
        DefaultMQProducer producer = new DefaultMQProducer(MqConstant.PRODUCER_GROUP+ "_sync");
        // 設(shè)置nameServer地址
        producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        // 啟動(dòng)實(shí)例
        producer.start();
        Message msg = new Message("TopicSyncTest", ("異步消息").getBytes());
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("發(fā)送成功 (后執(zhí)行)");
            }
            @Override
            public void onException(Throwable e) {
                log.info("發(fā)送失敗 (后執(zhí)行)");
            }
        });
        log.info("先執(zhí)行");
        // 掛起jvm 因?yàn)榛卣{(diào)是異步的不然測(cè)試不出來(lái)
        System.in.read();
        // 關(guān)閉實(shí)例
        producer.shutdown();
    }
發(fā)送單向消息

這種方式主要用在不關(guān)心發(fā)送結(jié)果的場(chǎng)景,這種方式吞吐量很大,但是存在消息丟失的風(fēng)險(xiǎn),例如日志信息的發(fā)送

    // 發(fā)送單向消息
    producer.sendOneway(msg);
發(fā)送延遲消息

消息放入mq后,過(guò)一段時(shí)間,才會(huì)被監(jiān)聽(tīng)到,然后消費(fèi)

比如下訂單業(yè)務(wù),提交了一個(gè)訂單就可以發(fā)送一個(gè)延時(shí)消息,15min后去檢查這個(gè)訂單的狀態(tài),如果還是未付款就取消訂單釋放庫(kù)存。

DelayLevel

    Message msg = new Message("TopicTest", ("延遲消息").getBytes());
    // 給這個(gè)消息設(shè)定一個(gè)延遲等級(jí)
    // messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    msg.setDelayTimeLevel(3);
    // 發(fā)送單向消息
    producer.send(msg);

注意

RocketMQ支持以下幾個(gè)固定的延時(shí)等級(jí),等級(jí)1就對(duì)應(yīng)1s,以此類推,最高支持2h延遲

private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;

投遞等級(jí)(delay level) 延遲時(shí)間 投遞等級(jí)(delay level) 延遲時(shí)間
1 1s 10 6min
2 5s 11 7min
3 10s 12 8min
4 30s 13 9min
5 1min 14 10min
6 2min 15 20min
7 3min 16 30min
8 4min 17 1h
9 5min 18 2h

修改延時(shí)級(jí)別

RocketMQ的延遲等級(jí)可以進(jìn)行修改,以滿足自己的業(yè)務(wù)需求,可以修改/添加新的level。具體見(jiàn)[該文章](RocketMQ進(jìn)階-延時(shí)消息 - 知乎 (zhihu.com))

同時(shí)5.0支持使用時(shí)間戳來(lái)設(shè)置延遲時(shí)間定時(shí)/延時(shí)消息 | RocketMQ (apache.org)

發(fā)送批量消息

可以一次性發(fā)送一組消息,那么這一組消息會(huì)被當(dāng)做一個(gè)消息消費(fèi)

    List<Message> msgs = Arrays.asList(
            new Message("TopicTest", "我是一組消息的A消息".getBytes()),
            new Message("TopicTest", "我是一組消息的B消息".getBytes()),
            new Message("TopicTest", "我是一組消息的C消息".getBytes())

    );
    SendResult send = producer.send(msgs);

發(fā)送順序消息

順序消息是一種特殊類型的消息,可以保證按照發(fā)送的順序進(jìn)行消費(fèi),從而保證了消息的有序性。

在 RocketMQ 中,保證消息順序發(fā)送的關(guān)鍵是要將相關(guān)的消息發(fā)送到同一個(gè)隊(duì)列中,并且消費(fèi)者按照隊(duì)列的順序來(lái)消費(fèi)消息

以下是實(shí)現(xiàn)順序消息的步驟:

  1. 創(chuàng)建一個(gè)指定順序的 MessageQueueSelector。 在發(fā)送消息時(shí),你需要指定一個(gè) MessageQueueSelector 來(lái)選擇目標(biāo)消息隊(duì)列。該 Selector 將根據(jù)某種規(guī)則將相關(guān)的消息發(fā)送到同一個(gè)隊(duì)列中,保證了消息的順序性。
  2. 設(shè)置 MessageQueueSelector 選擇消息隊(duì)列的邏輯。 在實(shí)現(xiàn) MessageQueueSelector 接口的 select 方法中,你需要編寫(xiě)邏輯來(lái)選擇目標(biāo)隊(duì)列??梢愿鶕?jù)消息的某個(gè)屬性或者業(yè)務(wù)關(guān)聯(lián)來(lái)確定消息應(yīng)該發(fā)送到哪個(gè)隊(duì)列。
  3. 發(fā)送消息時(shí)使用 MessageQueueSelector。 在發(fā)送消息時(shí),使用 producer.send(msg, selector, orderId) 方法來(lái)指定消息發(fā)送的隊(duì)列。其中,selector 參數(shù)即為你實(shí)現(xiàn)的 MessageQueueSelector 接口的實(shí)例,orderId 是一個(gè)標(biāo)識(shí)消息順序的參數(shù)。
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 當(dāng)前主題有多少個(gè)隊(duì)列
                        int queueNumber = mqs.size();
                        // 這個(gè)arg就是后面?zhèn)魅氲?orderId
                        Integer id = (Integer) arg;
                        // 用這個(gè)值去%隊(duì)列的個(gè)數(shù)得到一個(gè)隊(duì)列
                        int index = id % queueNumber
                        // 返回選擇的這個(gè)隊(duì)列即可 ,那么相同的orderId 就會(huì)被放在相同的隊(duì)列里 實(shí)現(xiàn)First In, First                           //Out了
                        return mqs.get(index);
                    }
                }, orderId);

消費(fèi)者的監(jiān)聽(tīng) MessageListenerOrderly如下

    // 注冊(cè)一個(gè)消費(fèi)監(jiān)聽(tīng) MessageListenerOrderly 是順序消費(fèi) 單線程消費(fèi)
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            MessageExt messageExt = msgs.get(0);
            System.out.println(new String(messageExt.getBody()));
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });

5.SpringBoot集成RocketMQ

搭建rocketmq-producer(消息生產(chǎn)者)模塊

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

完整的依賴
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
        
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-spring-boot-starter</artifactId>
			<version>2.1.1</version>
		</dependency>

	</dependencies>

修改配置文件application.yml
spring:
  application:
    name: rocketmq-producer
rocketmq:
  name-server: ip:9876     # rocketMq的nameServer地址
  producer:
    group: boot-test-producer-group        # 生產(chǎn)者組別
    send-message-timeout: 3000  # 消息發(fā)送的超時(shí)時(shí)間
    retry-times-when-send-async-failed: 2  # 異步消息發(fā)送失敗重試次數(shù)
    max-message-size: 4194304       # 消息的最大長(zhǎng)度

添加測(cè)試類的內(nèi)容:
	/**
	 * 注入rocketMQTemplate,我們使用它來(lái)操作mq
	 */
	@Autowired
	private RocketMQTemplate rocketMQTemplate;

	/**
	 * 測(cè)試發(fā)送簡(jiǎn)單的消息
	 * @throws Exception
	 */
	@Test
	public void testSimpleMsg() throws Exception {
		// boot-test是topic,我是一個(gè)簡(jiǎn)單的消息是消息內(nèi)容
		SendResult sendResult = rocketMQTemplate.syncSend("boot-test", "我是一個(gè)簡(jiǎn)單的消息");
		// 拿到消息的發(fā)送狀態(tài)
		log.info(String.valueOf(sendResult.getSendStatus()));
		// 拿到消息的id
		log.info(sendResult.getMsgId());
	}
執(zhí)行后,可得到:

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

同理創(chuàng)建消費(fèi)者模塊

修改配置文件application.yml
spring:
  application:
    name: rocketmq-consumer
rocketmq:
  name-server: 47.115.209.249:9876     # rocketMq?nameServer??
添加測(cè)試類的內(nèi)容:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 創(chuàng)建一個(gè)簡(jiǎn)單消息的監(jiān)聽(tīng)
 * 1.類上添加注解@Component和@RocketMQMessageListener
 *      @RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group")
 *      topic指定消費(fèi)的主題,consumerGroup指定消費(fèi)組,一個(gè)主題可以有多個(gè)消費(fèi)者組,一個(gè)消息可以被多個(gè)不同的組的消費(fèi)者都消費(fèi)
 * 2.實(shí)現(xiàn)RocketMQListener接口,注意泛型的使用,可以為具體的類型,如果想拿到消息
 * 的其他參數(shù)可以寫(xiě)成MessageExt
 */
@Component
@RocketMQMessageListener(topic = "boot-test", consumerGroup = "boot-test-consumer-group",messageModel = MessageModel.CLUSTERING)
@Slf4j
public class SimpleMsgListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        log.info("接收到消息:{}",s);
    }
}
執(zhí)行后,可得到:

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

dockerdesktop安裝 rocketmq,java-rocketmq,rocketmq,linux

添加測(cè)試類的內(nèi)容:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 創(chuàng)建一個(gè)簡(jiǎn)單消息的監(jiān)聽(tīng)
 * 1.類上添加注解@Component和@RocketMQMessageListener
 *      @RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group")
 *      topic指定消費(fèi)的主題,consumerGroup指定消費(fèi)組,一個(gè)主題可以有多個(gè)消費(fèi)者組,一個(gè)消息可以被多個(gè)不同的組的消費(fèi)者都消費(fèi)
 * 2.實(shí)現(xiàn)RocketMQListener接口,注意泛型的使用,可以為具體的類型,如果想拿到消息
 * 的其他參數(shù)可以寫(xiě)成MessageExt
 */
@Component
@RocketMQMessageListener(topic = "boot-test", consumerGroup = "boot-test-consumer-group",messageModel = MessageModel.CLUSTERING)
@Slf4j
public class SimpleMsgListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        log.info("接收到消息:{}",s);
    }
}
執(zhí)行后,可得到:

[外鏈圖片轉(zhuǎn)存中…(img-K8TtTKkm-1691029883688)]

[外鏈圖片轉(zhuǎn)存中…(img-flI5rjFX-1691029883688)]

6.最后

從文章整體沒(méi)有涉及太深入的一些機(jī)制和原理的講解,因此僅作為入門學(xué)習(xí)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-770303.html

到了這里,關(guān)于RocketMQ的windos/linux/docker超詳細(xì)安裝及簡(jiǎn)單入門!的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【Linux】Linux環(huán)境下安裝RocketMQ(圖文解說(shuō)詳細(xì)版)

    【Linux】Linux環(huán)境下安裝RocketMQ(圖文解說(shuō)詳細(xì)版)

    消息隊(duì)列中間件是分布式系統(tǒng)中的重要組件,主要解決應(yīng)用耦合、流量削峰等問(wèn)題,目前主流的 MQ 主要是:RocketMQ、kafka、RabbitMQ等。 RocketMQ 相較于其它 MQ 的優(yōu)勢(shì): 支持事務(wù)型消息(消息發(fā)送和 DB 操作保持兩方的最終一致性,RabbitMQ 和 Kafka 不支持) 支持結(jié)合 RocketMQ 的多個(gè)

    2024年02月16日
    瀏覽(37)
  • Windows安裝使用Docker,方便你的開(kāi)發(fā)和部署(DockerDesktop篇)

    Windows安裝使用Docker,方便你的開(kāi)發(fā)和部署(DockerDesktop篇)

    前言 首先聲明,此篇不是完全的 Docker 技術(shù)文章,而是單純的教你使用 Docker , 不包含 Docker 的一些命令、如何打包 Docker 鏡像等等。 為什么要用 Docker ? 大家好,我是小簡(jiǎn),今天帶來(lái)一篇 Windosw 環(huán)境下使用 Docker 的教程,非常方便哦。 不需要說(shuō)什么容器化、什么持續(xù)集成,

    2024年02月03日
    瀏覽(14)
  • ELK實(shí)戰(zhàn),Linux版docker安裝ElasticSearch、ES-head、Logstash、Kiabana入門,無(wú)坑詳細(xì)圖解

    ELK實(shí)戰(zhàn),Linux版docker安裝ElasticSearch、ES-head、Logstash、Kiabana入門,無(wú)坑詳細(xì)圖解

    ????????項(xiàng)目需要,記錄一次ELK日志分析系統(tǒng)無(wú)坑初始安裝過(guò)程,并給大家整理出了操作elasticsearch的主要命令,elasticsearch!伙伴們都懂得哦!別的不多說(shuō),看過(guò)內(nèi)容概覽,直接開(kāi)整?。?! 1-1 修改/etc/security/limits.conf limits.conf文件限制著用戶可以使用的最大文件數(shù),最大線

    2023年04月09日
    瀏覽(23)
  • Docker安裝RabbitMQ詳細(xì)教程(簡(jiǎn)單版)

    Docker安裝RabbitMQ詳細(xì)教程(簡(jiǎn)單版)

    RabbitMQ是由erlang語(yǔ)言開(kāi)發(fā),基于AMQP(Advanced Message Queue 高級(jí)消息隊(duì)列協(xié)議)協(xié)議實(shí)現(xiàn)的消息隊(duì)列,它是一種應(yīng)用程序之間的通信方法,消息隊(duì)列在分布式系統(tǒng)開(kāi)發(fā)中應(yīng)用非常廣泛。RabbitMQ官方地址:http://www.rabbitmq.com 前期準(zhǔn)備工作 ? 一般情況,需要先安裝部署Erlang環(huán)境再安裝

    2024年02月11日
    瀏覽(21)
  • 使用 Docker 安裝 RocketMQ 使用 docker 安裝 rocketmq

    Docker常用命令大全 RocketMQ 是一個(gè)分布式的消息中間件,由 NameServer 和Broker兩個(gè)角色組成,是一種典型的基于發(fā)布/訂閱模式的消息通信解決方案。 NameServer 是 RocketMQ 的命名服務(wù),可以理解為類似于 DNS 的服務(wù),它主要負(fù)責(zé)記錄 Topic 的路由信息和 Broker 的地址信息。每個(gè) Rocket

    2024年02月13日
    瀏覽(26)
  • rocketmq消息注解基于springboot的簡(jiǎn)單應(yīng)用及默認(rèn)詳細(xì)配置

    ? ? ? ??rocketmq消息組件在springboot框架中的應(yīng)用,使用rocketmq的整合包進(jìn)行編碼實(shí)現(xiàn)。 引入rocketmq依賴jar, 最新版本參考GitHub - apache/rocketmq-spring: Apache RocketMQ Spring Integration 基本配置 rocketmq.name-server: ?rocketmq集群地址,單點(diǎn)或集群 rocketmq.producer.group : 生產(chǎn)者組名,用于標(biāo)識(shí)一組

    2024年02月12日
    瀏覽(21)
  • MAC M1上docker rocketmq簡(jiǎn)單環(huán)境搭建和代碼

    MAC M1上docker rocketmq簡(jiǎn)單環(huán)境搭建和代碼

    工作了這么多年,rocketmq還沒(méi)有用過(guò),由于現(xiàn)在的工作中涉及到了,周六吃完午飯就開(kāi)始搞,結(jié)果到現(xiàn)在3點(diǎn)鐘才把環(huán)境弄好,測(cè)試代碼搞起。 整個(gè)流程分成兩步 安裝簡(jiǎn)單的rocket環(huán)境 起springboot項(xiàng)目測(cè)試 參考文章: https://blog.csdn.net/baidu_33256174/article/details/129599300 1. 制作rocket

    2024年02月15日
    瀏覽(31)
  • Linux Zookeeper在Docker 安裝與簡(jiǎn)單通信

    Linux Zookeeper在Docker 安裝與簡(jiǎn)單通信

    一、在本地計(jì)算機(jī)上安裝Docker 1.安裝Docker (安裝最新的Docker版本) yum install docker-ce docker-ce-cli containerd.io docker-bulidx-plugin docker-compose-plugin 2.查看Docker版本并啟動(dòng)Docker docker version 安裝成功的Docker版本為24.0.6 systemctl start docker Docker 成功啟動(dòng) 二、在Docker中構(gòu)建映像(image),安裝Z

    2024年02月04日
    瀏覽(13)
  • ubuntu安裝、使用docker超級(jí)詳細(xì)的入門教程

    ubuntu安裝、使用docker超級(jí)詳細(xì)的入門教程

    查看官網(wǎng)(https://docs.docker.com/get-docker),根據(jù)系統(tǒng)版本安裝。 刪除老版本 設(shè)置儲(chǔ)存庫(kù) 添加官方秘鑰 授權(quán)docker并更新apt 安裝最新docker 試運(yùn)行 卸載應(yīng)用 刪除目錄 默認(rèn)路徑 登錄阿里云平臺(tái)(支付寶就能登錄) 找到容器鏡像服務(wù) 控制臺(tái)-產(chǎn)品與服務(wù)-彈性計(jì)算-容器鏡像服務(wù) 找到鏡

    2024年02月05日
    瀏覽(41)
  • Git快速入門篇—— Windows版本淘寶鏡像快速下載安裝詳細(xì)步驟及簡(jiǎn)單入門教程(附帶圖文教程)

    Git快速入門篇—— Windows版本淘寶鏡像快速下載安裝詳細(xì)步驟及簡(jiǎn)單入門教程(附帶圖文教程)

    前言:我們平時(shí)在整理代碼的時(shí)候,尤其是與別人一起開(kāi)發(fā)項(xiàng)目的時(shí)候,常常涉及到代碼的更新,因此代碼版本問(wèn)題成了一個(gè)很頭痛的事。而git正是為了解決這種問(wèn)題而誕生。本文將詳細(xì)介紹如何通過(guò)淘寶鏡像進(jìn)行g(shù)it的安裝以及git的簡(jiǎn)單入門技巧。 下一章: git與遠(yuǎn)程倉(cāng)庫(kù)的交

    2024年02月03日
    瀏覽(31)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包