RocketMQ簡(jiǎn)單入門
本文若有不當(dāng)之處歡迎提出pr/issue
主要內(nèi)容:
-
初識(shí)MQ
-
RocketMQ簡(jiǎn)介
-
RocketMQ安裝
-
RocketMQ快速入門
-
SpringBoot集成RocketMQ
-
最后
1.初識(shí)MQ
1.1.同步和異步通訊
微服務(wù)間通訊有同步和異步兩種方式:
同步通訊:就像打電話,需要實(shí)時(shí)響應(yīng)。
異步通訊:就像發(fā)郵件,不需要馬上回復(fù)。
兩種方式各有優(yōu)劣,打電話可以立即得到響應(yīng),但是你卻不能跟多個(gè)人同時(shí)通話。發(fā)送郵件可以同時(shí)與多個(gè)人收發(fā)郵件,但是往往響應(yīng)會(huì)有延遲。
1.1.1.同步通訊
Feign調(diào)用就屬于同步方式,雖然調(diào)用可以實(shí)時(shí)得到結(jié)果,但存在下面的問(wèn)題:
總結(jié):
同步調(diào)用的優(yōu)點(diǎn):
- 時(shí)效性較強(qiáng),可以立即得到結(jié)果
同步調(diào)用的問(wèn)題:
- 耦合度高
- 性能和吞吐能力下降
- 有額外的資源消耗
- 有級(jí)聯(lián)失敗問(wèn)題(由于一個(gè)故障導(dǎo)致了連鎖反應(yīng),使得系統(tǒng)中的其他組件或節(jié)點(diǎn)也相繼失?。?/li>
1.1.2.異步通訊
異步調(diào)用則可以避免上述問(wèn)題:
我們以購(gòu)買商品為例,用戶支付后需要調(diào)用訂單服務(wù)完成訂單狀態(tài)修改,調(diào)用物流服務(wù),從倉(cāng)庫(kù)分配響應(yīng)的庫(kù)存并準(zhǔn)備發(fā)貨。
在事件模式中,支付服務(wù)是事件發(fā)布者(publisher),在支付完成后只需要發(fā)布一個(gè)支付成功的事件(event),事件中帶上訂單id。
訂單服務(wù)和物流服務(wù)是事件訂閱者(Consumer),訂閱支付成功的事件,監(jiān)聽(tīng)到事件后完成自己業(yè)務(wù)即可。
為了解除事件發(fā)布者與訂閱者之間的耦合,兩者并不是直接通信,而是有一個(gè)中間人(Broker)。發(fā)布者發(fā)布事件到Broker,不關(guān)心誰(shuí)來(lái)訂閱事件。訂閱者從Broker訂閱事件,不關(guān)心誰(shuí)發(fā)來(lái)的消息。
Broker 是一個(gè)像數(shù)據(jù)總線一樣的東西,所有的服務(wù)要接收數(shù)據(jù)和發(fā)送數(shù)據(jù)都發(fā)到這個(gè)總線上,這個(gè)總線就像協(xié)議一樣,讓服務(wù)間的通訊變得標(biāo)準(zhǔn)和可控。
好處:
-
吞吐量提升:無(wú)需等待訂閱者處理完成,響應(yīng)更快速
-
故障隔離:服務(wù)沒(méi)有直接調(diào)用,不存在級(jí)聯(lián)失敗問(wèn)題
-
調(diào)用間沒(méi)有阻塞,不會(huì)造成無(wú)效的資源占用
-
耦合度極低,每個(gè)服務(wù)都可以靈活插拔,可替換
-
流量削峰:不管發(fā)布事件的流量波動(dòng)多大,都由Broker接收,訂閱者可以按照自己的速度去處理事件
缺點(diǎn):
- 架構(gòu)復(fù)雜了,業(yè)務(wù)沒(méi)有明顯的流程線,不好管理
- 需要依賴于Broker的可靠、安全、性能
好在現(xiàn)在開(kāi)源軟件或云平臺(tái)上 Broker 的軟件是非常成熟的,比較常見(jiàn)的一種就是我們今天要學(xué)習(xí)的MQ技術(shù)。
1.2.技術(shù)對(duì)比:
MQ,中文是消息隊(duì)列(MessageQueue),字面來(lái)看就是存放消息的隊(duì)列。也就是事件驅(qū)動(dòng)架構(gòu)中的Broker。
幾種常見(jiàn)MQ的對(duì)比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社區(qū) | Rabbit | Apache | 阿里 | Apache |
開(kāi)發(fā)語(yǔ)言 | Erlang | Java | Java | Scala&Java |
協(xié)議支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定義協(xié)議 | 自定義協(xié)議 |
可用性 | 高 | 一般 | 高 | 高 |
單機(jī)吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延遲 | 微秒級(jí) | 毫秒級(jí) | 毫秒級(jí) | 毫秒以內(nèi) |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延遲:RabbitMQ、Kafka
不同的消息隊(duì)列系統(tǒng)在不同場(chǎng)景下有各自的優(yōu)勢(shì)和適用性。以下是各個(gè)消息隊(duì)列系統(tǒng)在不同場(chǎng)合下的最佳選擇:
- Kafka:
- 最佳場(chǎng)合:大規(guī)模數(shù)據(jù)處理、實(shí)時(shí)日志收集和分析、流式處理。
- 優(yōu)勢(shì):高吞吐量、低延遲、水平擴(kuò)展能力強(qiáng)、長(zhǎng)期消息存儲(chǔ),適合構(gòu)建大規(guī)模的實(shí)時(shí)數(shù)據(jù)流處理平臺(tái),如實(shí)時(shí)日志收集和分析、事件流處理等。
- RabbitMQ:
- 最佳場(chǎng)合:傳統(tǒng)的企業(yè)級(jí)應(yīng)用、輕量級(jí)的消息傳遞場(chǎng)景。
- 優(yōu)勢(shì):簡(jiǎn)單易用、支持多種消息協(xié)議、適合點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱模式,對(duì)于傳統(tǒng)的企業(yè)應(yīng)用和中小規(guī)模的消息傳遞需求,是一種可靠的選擇。
- ActiveMQ:
- 最佳場(chǎng)合:中小規(guī)模的企業(yè)應(yīng)用、Java生態(tài)系統(tǒng)中的集成需求。
- 優(yōu)勢(shì):Java開(kāi)發(fā)環(huán)境友好、支持多種消息協(xié)議,適合與Java生態(tài)系統(tǒng)的其他組件集成,如Spring框架等。
- RocketMQ:
- 最佳場(chǎng)合:大規(guī)模的分布式系統(tǒng)、互聯(lián)網(wǎng)應(yīng)用、金融領(lǐng)域的消息處理。
- 優(yōu)勢(shì):高吞吐量、低延遲、豐富的消息存儲(chǔ)模式,適用于處理大規(guī)模的消息傳遞場(chǎng)景,特別是在互聯(lián)網(wǎng)和金融領(lǐng)域。
綜合考慮以上因素,可以做如下簡(jiǎn)單總結(jié):
- 如果需要處理大規(guī)模的實(shí)時(shí)數(shù)據(jù)流、日志收集和分析等高吞吐量場(chǎng)景,首選
Kafka
。 - 如果對(duì)于消息傳遞的簡(jiǎn)單性和易用性有較高要求,適合中小規(guī)模的企業(yè)應(yīng)用和輕量級(jí)消息傳遞需求,可以選擇
RabbitMQ
或ActiveMQ
。 - 如果在大規(guī)模的分布式系統(tǒng)、互聯(lián)網(wǎng)應(yīng)用或金融領(lǐng)域需要處理消息傳遞,
RocketMQ
是一個(gè)較好的選擇。
2.RocketMQ簡(jiǎn)介
官網(wǎng): http://rocketmq.apache.org/
RocketMQ是阿里巴巴2016年MQ中間件,使用Java語(yǔ)言開(kāi)發(fā),RocketMQ 是一款開(kāi)源的分布式消息系統(tǒng),基于高可用分布式集群技術(shù),提供低延時(shí)的、高可靠的消息發(fā)布與訂閱服務(wù)。同時(shí),廣泛應(yīng)用于多個(gè)領(lǐng)域,包括異步通信解耦、企業(yè)解決方案、金融支付、電信、電子商務(wù)、快遞物流、廣告營(yíng)銷、社交、即時(shí)通信、移動(dòng)應(yīng)用、手游、視頻、物聯(lián)網(wǎng)、車聯(lián)網(wǎng)等。
RocketMQ的設(shè)計(jì)目標(biāo)是支持大規(guī)模消息處理,具有高并發(fā)、高可用和容錯(cuò)能力。它在多個(gè)方面提供了強(qiáng)大的功能和特性:
- 分布式架構(gòu):RocketMQ采用分布式架構(gòu),支持在多個(gè)節(jié)點(diǎn)之間進(jìn)行消息的發(fā)送和接收,實(shí)現(xiàn)了水平擴(kuò)展能力。
- 高吞吐量:RocketMQ可以在大規(guī)模并發(fā)場(chǎng)景下實(shí)現(xiàn)高吞吐量的消息處理,適用于高并發(fā)的業(yè)務(wù)場(chǎng)景。
- 低延遲:RocketMQ具有較低的消息傳遞延遲,適用于需要實(shí)時(shí)性的應(yīng)用場(chǎng)景。
- 消息可靠性:RocketMQ提供了多種消息存儲(chǔ)模式,可以確保消息的可靠傳遞,包括同步刷盤和異步刷盤等方式。
- 消息順序性:RocketMQ支持消息的順序傳遞,可以確保同一消息隊(duì)列中的消息按照發(fā)送順序被消費(fèi)。
- 支持多種消息模式:RocketMQ支持發(fā)布/訂閱模式和點(diǎn)對(duì)點(diǎn)模式,可以根據(jù)業(yè)務(wù)需求選擇合適的消息模式。
- 靈活的部署方式:RocketMQ支持多種部署方式,可以在單機(jī)上運(yùn)行,也可以搭建集群部署。
- 豐富的監(jiān)控和管理工具:RocketMQ提供了豐富的監(jiān)控和管理工具,方便管理員對(duì)消息隊(duì)列進(jìn)行監(jiān)控和管理。
核心概念
Producer:消息的發(fā)送者,生產(chǎn)者;舉例:發(fā)件人。
Consumer:消息接收者,消費(fèi)者;舉例:收件人。
Broker:消息隊(duì)列的中間服務(wù)器,負(fù)責(zé)存儲(chǔ)消息并將消息傳遞給消費(fèi)者;舉例:快遞。
NameServer:可以理解為是一個(gè)注冊(cè)中心,主要是用來(lái)保存topic路由信息,管理Broker。在NameServer的集群中,NameServer與NameServer之間是沒(méi)有任何通信的;舉例:各個(gè)快遞公司的管理機(jī)構(gòu)相當(dāng)于broker的注冊(cè)中心,保留了broker的信息。
Queue:隊(duì)列,消息存放的位置,一個(gè)Broker中可以有多個(gè)隊(duì)列。
Topic:消息的邏輯分類,生產(chǎn)者發(fā)送消息到指定的Topic,消費(fèi)者從指定的Topic訂閱消息。一個(gè)Topic可以有多個(gè)Producer和多個(gè)Consumer。
ProducerGroup:生產(chǎn)者組 。
ConsumerGroup:消費(fèi)者組,多個(gè)消費(fèi)者組可以同時(shí)消費(fèi)一個(gè)主題的消息。
工作流程
該部分轉(zhuǎn)載自 [掘金文章](RocketMQ保姆級(jí)教程 - 掘金 (juejin.cn))
通過(guò)這張圖就可以很清楚的知道,RocketMQ大致的工作流程:
-
Broker
啟動(dòng)的時(shí)候,會(huì)往每臺(tái)NameServer(因?yàn)镹ameServer之間不通信,所以每臺(tái)都得注冊(cè))注冊(cè)自己的信息,這些信息包括自己的ip和端口號(hào),自己這臺(tái)Broker有哪些topic等信息。 -
Producer
在啟動(dòng)之后會(huì)跟會(huì)NameServer建立連接,定期從NameServer中獲取Broker的信息,當(dāng)發(fā)送消息的時(shí)候,會(huì)根據(jù)消息需要發(fā)送到哪個(gè)topic去找對(duì)應(yīng)的Broker地址,如果有的話,就向這臺(tái)Broker發(fā)送請(qǐng)求;沒(méi)有找到的話,就看根據(jù)是否允許自動(dòng)創(chuàng)建topic來(lái)決定是否發(fā)送消息。 -
Broker
在接收到Producer的消息之后,會(huì)將消息存起來(lái),持久化,如果有從節(jié)點(diǎn)的話,也會(huì)主動(dòng)同步給從節(jié)點(diǎn),實(shí)現(xiàn)數(shù)據(jù)的備份 -
Consumer
啟動(dòng)之后也會(huì)跟會(huì)NameServer建立連接,定期從NameServer中獲取Broker和對(duì)應(yīng)topic的信息,然后根據(jù)自己需要訂閱的topic信息找到對(duì)應(yīng)的Broker的地址,然后跟Broker建立連接,獲取消息,進(jìn)行消費(fèi)
3.RocketMQ安裝
本文檔所涉及的是單機(jī)版的RocketMQ安裝教程,能夠滿足基本的學(xué)習(xí)使用,屬于入門級(jí)的教程,如果想要搭集群部署,可以參考其他資料,進(jìn)行配置即可
3.1.Windos下的安裝
所需環(huán)境
Windows 64位系統(tǒng)
JDK1.8(64位)
Maven
進(jìn)入[RocketMQ官網(wǎng)下載](下載 | RocketMQ (apache.org))
1、選擇Binary 下載
2、將壓縮包解壓至自定路徑
3、配置系統(tǒng)中的環(huán)境變量
變量名:ROCKETMQ_HOME
變量值:(如圖瀏覽目錄選擇指定bin-release文件夾路徑)
4.啟動(dòng)RocketMQ
在自己安裝的RocketMQ的bin目錄下執(zhí)行cmd命令,輸入下面命令,啟動(dòng)NameServer
start mqnamesrv.cmd
若出現(xiàn)如上圖所示的命令框,說(shuō)明啟動(dòng)成功,保留窗口切勿關(guān)閉
繼續(xù)啟動(dòng)broker
與上述同樣的路徑下呼出cmd,執(zhí)行如下命令:
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable = true
看到上述命令框彈出即完成對(duì)RocketMQ的啟動(dòng)。
注意:
RocketMQ默認(rèn)的虛擬機(jī)內(nèi)存較大,啟動(dòng)如果因?yàn)閮?nèi)存不足報(bào)錯(cuò)可執(zhí)行以下步驟:
用記事本打開(kāi)bin目錄下的 runbroker.cmd
-Xms2g
:設(shè)置JVM初始堆內(nèi)存大小為2GB。-Xmx2g
:設(shè)置JVM最大堆內(nèi)存大小為2GB。可修改為 -Xms256m -Xmx256m -Xmn128m
同理打開(kāi)runserver.cmd
修改jvm參數(shù)為
-Xms256m -Xmx256m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m
5.配置可視化頁(yè)面
下載可視化插件源碼
github下載地址:https://github.com/apache/rocketmq-dashboard
復(fù)制下載鏈接后使用git下載
可自建文件夾,進(jìn)入后使用git bash下載
git clone https://github.com/apache/rocketmq-dashboard.git
下載完成后,進(jìn)入application.yml
中查看配置
保存后進(jìn)入到 …/rocketmq-dashboard目錄下,鼠標(biāo)右鍵進(jìn)入git控制臺(tái)
執(zhí)行 mvn clean package -Dmaven.test.skip=true
將該文件打包成jar包,該jar包保存在 該目錄的 target子目錄下
打包完成!
在 target子目錄下可找到對(duì)應(yīng)的jar包
在該目錄下打開(kāi)cmd,輸入指令==(請(qǐng)保證已經(jīng)運(yùn)行NameServer和broker)==:
java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar
成功執(zhí)行jar包
然后在網(wǎng)頁(yè)中訪問(wèn) http://127.0.0.1:8080/#/ 即可進(jìn)入rocketmq的圖形化界面
3.2.Linux下的安裝
請(qǐng)?zhí)崆霸O(shè)置服務(wù)器的防火墻,放通9876和10909(默認(rèn)的 RocketMQ Broker 端口號(hào))端口
進(jìn)入[RocketMQ官網(wǎng)下載](下載 | RocketMQ (apache.org))
1、選擇Binary 下載
2、在linux中創(chuàng)建RocketMQ文件夾
mkdir RocketMQ
3、將rocketmq-all-5.1.2-bin-release.zip壓縮文件上傳到linux服務(wù)器中
連接工具XSHELL - NetSarang Website
將壓縮包上傳到第2步創(chuàng)建的文件中
4、解壓zip包
cd ./RocketMQ/
unzip rocketmq-all-5.1.2-bin-release.zip
如果你的服務(wù)器沒(méi)有unzip命令,則下載安裝一個(gè)
yum install unzip
5、配置環(huán)境變量
vim /etc/profile
在文件末尾添加
export NAMESRV_ADDR=服務(wù)器IP:9876
6、修改腳本文件
修改目錄/root/RocketMQ/rocketmq-all-5.1.2-bin-release/bin下的配置文件: runserver.sh
、runbroker.sh
修改runserver.sh
中原有內(nèi)存配置
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改runbroker.sh 中原有內(nèi)存配置
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
修改目錄/root/RocketMQ/rocketmq-all-5.1.2-bin-release/conf/broker.conf文件
在最后添加上
namesrvAddr = (服務(wù)器ip):9876
autoCreateTopicEnable=true
brokerIP1 = (服務(wù)器ip)
7、啟動(dòng)
進(jìn)入/root/RocketMQ/rocketmq-all-5.1.2-bin-release
首先在安裝目錄下創(chuàng)建一個(gè)logs文件夾,用于存放日志
mkdir logs
運(yùn)行兩條命令,啟動(dòng)NameServer和broker
nohup sh bin/mqnamesrv > ./logs/namesrv.log &
nohup sh bin/mqbroker -c conf/broker.conf > ./logs/broker.log &
運(yùn)行后可在logs文件夾下看到兩個(gè)日志文件
8.配置可視化頁(yè)面
前置步驟參考windows下的第5步5.配置可視化頁(yè)面
將jar包上傳到服務(wù)器的/root/RocketMQ中
然后在RockerMQ中運(yùn)行指令:
nohup java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar rocketmq.config.namesrvAddr=127.0.0.1:9876
命令拓展:–server.port指定運(yùn)行的端口
–rocketmq.config.namesrvAddr=127.0.0.1:9876 指定namesrv地址
成功運(yùn)行!
最后訪問(wèn) 服務(wù)器ip:8080 即可訪問(wèn)到圖形化界面
3.3.docker安裝(推薦)
1、下載RockerMQ需要的鏡像
docker pull rocketmqinc/rocketmq
docker pull styletang/rocketmq-console-ng
2、啟動(dòng)NameServer服務(wù)
創(chuàng)建NameServer數(shù)據(jù)存儲(chǔ)路徑
mkdir -p /home/rocketmq/data/namesrv/logs /home/rocketmq/data/namesrv/store
啟動(dòng)NameServer容器
docker run -d --name rmqnamesrv -p 9876:9876 -v /home/rocketmq/data/namesrv/logs:/root/logs -v /home/rocketmq/data/namesrv/store:/root/store -e “MAX_POSSIBLE_HEAP=100000000” rocketmqinc/rocketmq sh mqnamesrv
這是一個(gè)Docker命令,用于在Docker容器中運(yùn)行RocketMQ Name Server(消息服務(wù)器)。讓我們逐步解釋這個(gè)命令:
bashCopy codedocker run -d \
--name rmqnamesrv \
-p 9876:9876 \
-v /home/rocketmq/data/namesrv/logs:/root/logs \
-v /home/rocketmq/data/namesrv/store:/root/store \
-e "MAX_POSSIBLE_HEAP=100000000" \
rocketmqinc/rocketmq sh mqnamesrv
解釋:
-
docker run
: 這是Docker命令的基本部分,用于運(yùn)行一個(gè)新的容器。 -
-d
: 這是一個(gè)選項(xiàng),表示在后臺(tái)(detached mode)運(yùn)行容器。 -
--name rmqnamesrv
: 這是為容器指定一個(gè)名稱,該名稱為"rmqnamesrv"。 -
-p 9876:9876
: 這是端口映射的選項(xiàng),將主機(jī)的端口9876映射到容器的端口9876。RocketMQ的Name Server默認(rèn)監(jiān)聽(tīng)端口是9876,通過(guò)這個(gè)映射,可以從主機(jī)的9876端口訪問(wèn)容器中運(yùn)行的RocketMQ Name Server。 -
-v /home/rocketmq/data/namesrv/logs:/root/logs
: 這是用于將主機(jī)的/home/rocketmq/data/namesrv/logs
目錄映射到容器內(nèi)的/root/logs
目錄。這樣做的目的是將RocketMQ Name Server的日志文件存儲(chǔ)在主機(jī)的目錄中,方便查看和管理。 -
-v /home/rocketmq/data/namesrv/store:/root/store
: 這是用于將主機(jī)的/home/rocketmq/data/namesrv/store
目錄映射到容器內(nèi)的/root/store
目錄。這樣做的目的是將RocketMQ Name Server的存儲(chǔ)文件存儲(chǔ)在主機(jī)的目錄中。 -
-e "MAX_POSSIBLE_HEAP=100000000"
: 這是用于設(shè)置環(huán)境變量的選項(xiàng),設(shè)置了RocketMQ Name Server的最大堆內(nèi)存大小為100,000,000字節(jié),約為100MB。 -
rocketmqinc/rocketmq
: 這是指定要運(yùn)行的Docker鏡像的名稱。在這里,它使用了RocketMQ官方提供的Docker鏡像,名為rocketmqinc/rocketmq
。 -
sh mqnamesrv
: 這是在容器中要運(yùn)行的命令。在這里,它運(yùn)行了RocketMQ Name Server的啟動(dòng)命令。
3、啟動(dòng)Broker服務(wù)
創(chuàng)建Broker數(shù)據(jù)存儲(chǔ)路徑
mkdir -p /home/rocketmq/data/broker/logs /home/rocketmq/data/broker/store
創(chuàng)建conf配置文件目錄
mkdir /home/rocketmq/conf
在配置文件目錄下創(chuàng)建broker.conf配置文件
# 所屬集群名稱,如果節(jié)點(diǎn)較多可以配置多個(gè)
brokerClusterName = DefaultCluster
#broker名稱,master和slave使用相同的名稱,表明他們的主從關(guān)系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示幾點(diǎn)做消息刪除動(dòng)作,默認(rèn)是凌晨4點(diǎn)
deleteWhen = 04
#在磁盤上保留消息的時(shí)長(zhǎng),單位是小時(shí)
fileReservedTime = 48
#有三個(gè)值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和異步表示Master和Slave之間同步數(shù)據(jù)的機(jī)制;
brokerRole = ASYNC_MASTER
#刷盤策略,取值為:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盤和異步刷盤;SYNC_FLUSH消息寫(xiě)入磁盤后才返回成功狀態(tài),ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 設(shè)置broker節(jié)點(diǎn)所在服務(wù)器的ip地址
autoCreateTopicEnable=true
brokerIP1 = 你服務(wù)器外網(wǎng)ip
啟動(dòng)Broker容器 (注意先開(kāi)放10911和10909端口)
docker run -d --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 -v /home/rocketmq/data/broker/logs:/root/logs -v /home/rocketmq/data/broker/store:/root/store -v /home/rocketmq/conf/broker.conf:/opt/rocketmq/conf/broker.conf --privileged=true -e “NAMESRV_ADDR=namesrv:9876” -e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq/conf/broker.conf
解釋:
-
docker run
: 這是Docker命令的基本部分,用于運(yùn)行一個(gè)新的容器。 -
-d
: 這是一個(gè)選項(xiàng),表示在后臺(tái)(detached mode)運(yùn)行容器。 -
--name rmqbroker
: 這是為容器指定一個(gè)名稱,該名稱為"rmqbroker"。 -
--link rmqnamesrv:namesrv
: 這是用于將已經(jīng)運(yùn)行的RocketMQ Name Server容器 “rmqnamesrv” 鏈接到當(dāng)前運(yùn)行的Broker容器。這樣Broker容器就可以通過(guò)"namesrv"主機(jī)名訪問(wèn)Name Server。 -
-p 10911:10911
: 這是端口映射的選項(xiàng),將主機(jī)的端口10911映射到容器的端口10911。RocketMQ的Broker默認(rèn)監(jiān)聽(tīng)端口是10911,通過(guò)這個(gè)映射,可以從主機(jī)的10911端口訪問(wèn)容器中運(yùn)行的RocketMQ Broker。 -
-p 10909:10909
: 同上,將主機(jī)的端口10909映射到容器的端口10909。RocketMQ的Broker默認(rèn)監(jiān)聽(tīng)的另一個(gè)端口是10909,該端口用于向主節(jié)點(diǎn)發(fā)送心跳。 -
-v /home/rocketmq/data/broker/logs:/root/logs
: 這是用于將主機(jī)的/home/rocketmq/data/broker/logs
目錄映射到容器內(nèi)的/root/logs
目錄。這樣做的目的是將RocketMQ Broker的日志文件存儲(chǔ)在主機(jī)的目錄中,方便查看和管理。 -
-v /home/rocketmq/data/broker/store:/root/store
: 這是用于將主機(jī)的/home/rocketmq/data/broker/store
目錄映射到容器內(nèi)的/root/store
目錄。這樣做的目的是將RocketMQ Broker的存儲(chǔ)文件存儲(chǔ)在主機(jī)的目錄中。 -
-v /home/rocketmq/conf/broker.conf:/opt/rocketmq/conf/broker.conf
: 這是用于將主機(jī)的/home/rocketmq/conf/broker.conf
文件映射到容器內(nèi)的/opt/rocketmq/conf/broker.conf
文件。這個(gè)文件是RocketMQ Broker的配置文件,通過(guò)這個(gè)映射,可以將自定義的Broker配置應(yīng)用到容器中。 -
--privileged=true
: 這是為容器添加特權(quán)模式,這樣容器就可以獲得更高的權(quán)限。 -
-e "NAMESRV_ADDR=namesrv:9876"
: 這是用于設(shè)置環(huán)境變量的選項(xiàng),設(shè)置了RocketMQ Broker的Name Server地址為"namesrv:9876"。NAMESRV_ADDR
是RocketMQ Broker連接Name Server的地址,這里設(shè)置為"namesrv:9876"表示通過(guò)名為"namesrv"的容器連接Name Server。 -
-e "MAX_POSSIBLE_HEAP=200000000"
: 這是用于設(shè)置環(huán)境變量的選項(xiàng),設(shè)置了RocketMQ Broker的最大堆內(nèi)存大小為200,000,000字節(jié),約為200MB。 -
rocketmqinc/rocketmq
: 這是指定要運(yùn)行的Docker鏡像的名稱。在這里,它使用了RocketMQ官方提供的Docker鏡像,名為rocketmqinc/rocketmq
。 -
sh mqbroker -c /opt/rocketmq/conf/broker.conf
: 這是在容器中要運(yùn)行的命令。在這里,它運(yùn)行了RocketMQ Broker的啟動(dòng)命令,通過(guò)-c
參數(shù)指定了配置文件的路徑。
啟動(dòng)控制臺(tái) (注意先開(kāi)放9999端口)
docker run -d --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=服務(wù)器的ip:9876 \
-Dcom.rocketmq.sendMessageWithVIPChannel=false \
-Duser.timezone=‘Asia/Shanghai’" -v /etc/localtime:/etc/localtime -p 9999:8080 styletang/rocketmq-console-ng
解釋:
-
docker run
: 這是Docker命令的基本部分,用于運(yùn)行一個(gè)新的容器。 -
-d
: 這是一個(gè)選項(xiàng),表示在后臺(tái)(detached mode)運(yùn)行容器。 -
--name rmqadmin
: 這是為容器指定一個(gè)名稱,該名稱為"rmqadmin"。 -
-e "JAVA_OPTS=..."
: 這是用于設(shè)置Java虛擬機(jī)(JVM)運(yùn)行時(shí)的參數(shù)。在這里,它設(shè)置了三個(gè)參數(shù):-
-Drocketmq.namesrv.addr=服務(wù)器的ip:9876
:這是用于設(shè)置RocketMQ Name Server的地址。您需要將"服務(wù)器的ip"替換為實(shí)際的RocketMQ Name Server的IP地址,端口為9876。 -
-Dcom.rocketmq.sendMessageWithVIPChannel=false
:這是用于設(shè)置RocketMQ消息發(fā)送時(shí)是否啟用VIP通道的參數(shù),將其設(shè)置為false表示禁用VIP通道。 -
-Duser.timezone='Asia/Shanghai'
:這是用于設(shè)置容器時(shí)區(qū)的參數(shù),將其設(shè)置為’Asia/Shanghai’表示使用上海時(shí)區(qū)。
-
-
-v /etc/localtime:/etc/localtime
: 這是用于將主機(jī)的時(shí)區(qū)配置映射到容器內(nèi),保持容器與主機(jī)的時(shí)區(qū)一致。 -
-p 9999:8080
: 這是端口映射的選項(xiàng),將主機(jī)的端口9999映射到容器的端口8080。RocketMQ控制臺(tái)使用8080端口,通過(guò)這個(gè)映射,可以從主機(jī)的9999端口訪問(wèn)容器中運(yùn)行的RocketMQ控制臺(tái)。 -
styletang/rocketmq-console-ng
: 這是指定要運(yùn)行的Docker鏡像的名稱。在這里,它使用了RocketMQ控制臺(tái)NG的Docker鏡像,名為styletang/rocketmq-console-ng
。
正常啟動(dòng)后的docker ps
4、訪問(wèn)控制臺(tái)
http://你的服務(wù)器外網(wǎng)ip:9999/
4.RocketMQ快速入門
4.x文檔 下文基于該文檔
5.0文檔
通過(guò)該部分可以快速入門RocketMQ提供的多種發(fā)送消息的模式,例如同步消息,異步消息,順序消息,延遲消息,事務(wù)消息等
消息發(fā)送和監(jiān)聽(tīng)的流程
消息生產(chǎn)者
1.創(chuàng)建消息生產(chǎn)者producer
,并制定生產(chǎn)者組名
2.指定NameServer
地址
3.啟動(dòng)producer
4.創(chuàng)建消息對(duì)象,指定主題Topic
、Tag
和消息體等
5.發(fā)送消息
6.關(guān)閉生產(chǎn)者producer
消息消費(fèi)者
1.創(chuàng)建消費(fèi)者consumer
,制定消費(fèi)者組名
2.指定NameServe
r地址
3.創(chuàng)建監(jiān)聽(tīng)訂閱主題Topic
和Tag
等
4.處理消息
5.啟動(dòng)消費(fèi)者consumer
了解了消息發(fā)送和監(jiān)聽(tīng)的流程,我們可以開(kāi)始簡(jiǎn)單的代碼實(shí)現(xiàn)
Start
創(chuàng)建一個(gè)空項(xiàng)目 RocketMQ-study
在空項(xiàng)目下創(chuàng)建一個(gè)新模板:
簡(jiǎn)單測(cè)試
引入依賴:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--原生api-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
測(cè)試一個(gè)簡(jiǎn)單的生產(chǎn)方法和消費(fèi)方法:
package com.wp.rocketmqdemo01.constant;
public interface MqConstant {
/**
* 生產(chǎn)者組名
*/
String PRODUCER_GROUP = "test-producer-group";
/**
* 消費(fèi)者組名
*/
String CONSUMER_GROUP = "test-consumer-group";
/**
* 主題
*/
String TOPIC = "test-topic";
/**
* NameServer地址
*/
String NAME_SRV_ADDR = "ip:9876";
}
package com.wp.rocketmqdemo01.demo;
import com.wp.rocketmqdemo01.constant.MqConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Test;
import java.util.List;
@Slf4j
public class SimpleTest01 {
/**
* 生產(chǎn)者
* @throws Exception
*/
@Test
public void SimpleTestProducer() throws Exception {
// 創(chuàng)建一個(gè)生產(chǎn)者(制定一個(gè)組名
DefaultMQProducer producer = new DefaultMQProducer(MqConstant.PRODUCER_GROUP);
// 指定NameServer地址,連接到NameServer
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 啟動(dòng)生產(chǎn)者
producer.start();
// 創(chuàng)建一個(gè)消息
Message message = new Message(MqConstant.TOPIC, MqConstant.TAG, "Hello RocketMQ".getBytes());
// 發(fā)送消息
SendResult sendResult = producer.send(message);
log.info("消息發(fā)送結(jié)果:{}", sendResult);
// 關(guān)閉生產(chǎn)者
producer.shutdown();
}
/**
* 消費(fèi)者
* @throws Exception
*/
@Test
public void SimpleTestConsumer() throws Exception {
// 創(chuàng)建一個(gè)消費(fèi)者(制定一個(gè)組名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MqConstant.CONSUMER_GROUP);
// 指定NameServer地址,連接到NameServer
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 訂閱主題 *表示訂閱所有
consumer.subscribe(MqConstant.TOPIC, "*");
// 注冊(cè)消息監(jiān)聽(tīng)器(一直監(jiān)聽(tīng),異步回調(diào)方法)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 這個(gè)就是消費(fèi)的方法 業(yè)務(wù)邏輯
log.info("我是消費(fèi)者,我正在消費(fèi)消息");
log.info(list.get(0).toString());
for(MessageExt messageExt : list) {
log.info("消費(fèi)消息:{}", new String(messageExt.getBody()));
}
log.info("消息上下文:{}", consumeConcurrentlyContext);
// 返回消費(fèi)狀態(tài)
// 如果消費(fèi)成功,返回CONSUME_SUCCESS,消息會(huì)被消費(fèi)掉,從mq出隊(duì)
// 如果消費(fèi)失敗,返回RECONSUME_LATER,消息會(huì)重新投遞,mq不會(huì)出隊(duì)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動(dòng)消費(fèi)者 這個(gè)start一定要寫(xiě)在registerMessageListener下面
consumer.start();
// 保證消費(fèi)者不退出,掛起當(dāng)前jvm
System.in.read();
}
}
分別測(cè)試后得到如下結(jié)果:
-
Broker Offset
(代理器偏移量): Broker Offset是指消息隊(duì)列中的消息在Broker(消息代理器)上的偏移位置。當(dāng)生產(chǎn)者將消息發(fā)送到Broker時(shí),每條消息都會(huì)被賦予一個(gè)唯一的偏移量,表示該消息在隊(duì)列中的位置。Broker Offset主要由消息代理器維護(hù)和管理,用于追蹤消息的存儲(chǔ)和處理情況。 -
Consumer Offset
(消費(fèi)者偏移量): Consumer Offset是指消費(fèi)者在消費(fèi)消息時(shí)的位置偏移。當(dāng)消費(fèi)者成功消費(fèi)了一條消息后,會(huì)將自己的消費(fèi)偏移量記錄下來(lái),表示下次繼續(xù)消費(fèi)消息的起始位置。消費(fèi)者需要定期更新Consumer Offset,以保證消息處理的準(zhǔn)確性和可靠性。 -
Diff Total
(差異總數(shù)): Diff Total是Broker Offset和Consumer Offset之間的差異總和。也就是說(shuō),Diff Total表示消息隊(duì)列中已經(jīng)被生產(chǎn)者發(fā)送并存儲(chǔ)在Broker上的消息數(shù)量,但尚未被消費(fèi)者消費(fèi)的消息數(shù)量。Diff Total可以用來(lái)監(jiān)控消息隊(duì)列的堆積情況,幫助發(fā)現(xiàn)消息處理速度跟不上消息產(chǎn)生速度的問(wèn)題。
最簡(jiǎn)單的測(cè)試完成!
MQ的消費(fèi)模式
可以大致分為兩種:推(Push)模式和拉(Pull)模式
- 推(Push)模式: 在推模式中,消息隊(duì)列將消息直接推送給消費(fèi)者。一旦有新的消息產(chǎn)生并發(fā)送到隊(duì)列中,隊(duì)列會(huì)立即將該消息推送給已注冊(cè)的消費(fèi)者。這樣消費(fèi)者就可以及時(shí)收到并處理消息。推模式適用于需要實(shí)時(shí)響應(yīng)和高實(shí)時(shí)性的場(chǎng)景,比如即時(shí)通訊、實(shí)時(shí)推送等。
- 拉(Pull)模式: 在拉模式中,消費(fèi)者需要主動(dòng)從消息隊(duì)列中拉取消息。消費(fèi)者需要周期性地向隊(duì)列發(fā)起請(qǐng)求,查詢是否有新的消息可供消費(fèi)。如果隊(duì)列中有新消息,隊(duì)列會(huì)將這些消息返回給消費(fèi)者,然后消費(fèi)者再對(duì)這些消息進(jìn)行處理。拉模式適用于不需要實(shí)時(shí)響應(yīng)的場(chǎng)景,比如批量處理、數(shù)據(jù)同步等。
每種消費(fèi)模式都有其適用的場(chǎng)景和優(yōu)缺點(diǎn)。推模式能夠及時(shí)將消息推送給消費(fèi)者,實(shí)現(xiàn)了實(shí)時(shí)性和低延遲,但在高并發(fā)場(chǎng)景下可能會(huì)產(chǎn)生大量推送請(qǐng)求,增加系統(tǒng)壓力。而拉模式需要消費(fèi)者主動(dòng)輪詢消息隊(duì)列,可以控制消費(fèi)的速度,但可能會(huì)導(dǎo)致消息處理不及時(shí),影響系統(tǒng)的實(shí)時(shí)性。
發(fā)送同步消息
上面的快速入門就是發(fā)送同步消息,發(fā)送過(guò)后就會(huì)有一個(gè)返回值(SEND_OK),也就是mq服務(wù)器接收到消息后返回的一個(gè)確認(rèn),這種方式非常安全,但是性能上并沒(méi)有這么高,而且在mq集群中,也是要等到所有的從機(jī)都復(fù)制了消息以后才會(huì)返回,所以針對(duì)重要的消息可以選擇這種方式
發(fā)送異步消息
異步消息通常用在對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景,即發(fā)送端不能容忍長(zhǎng)時(shí)間地等待Broker的響應(yīng)。發(fā)送完以后會(huì)有一個(gè)異步消息通知
@Test
public void syncProducer() throws Exception {
// 創(chuàng)建默認(rèn)的生產(chǎn)者
DefaultMQProducer producer = new DefaultMQProducer(MqConstant.PRODUCER_GROUP+ "_sync");
// 設(shè)置nameServer地址
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 啟動(dòng)實(shí)例
producer.start();
Message msg = new Message("TopicSyncTest", ("異步消息").getBytes());
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("發(fā)送成功 (后執(zhí)行)");
}
@Override
public void onException(Throwable e) {
log.info("發(fā)送失敗 (后執(zhí)行)");
}
});
log.info("先執(zhí)行");
// 掛起jvm 因?yàn)榛卣{(diào)是異步的不然測(cè)試不出來(lái)
System.in.read();
// 關(guān)閉實(shí)例
producer.shutdown();
}
發(fā)送單向消息
這種方式主要用在不關(guān)心發(fā)送結(jié)果的場(chǎng)景,這種方式吞吐量很大,但是存在消息丟失的風(fēng)險(xiǎn),例如日志信息的發(fā)送
// 發(fā)送單向消息
producer.sendOneway(msg);
發(fā)送延遲消息
消息放入mq后,過(guò)一段時(shí)間,才會(huì)被監(jiān)聽(tīng)到,然后消費(fèi)
比如下訂單業(yè)務(wù),提交了一個(gè)訂單就可以發(fā)送一個(gè)延時(shí)消息,15min后去檢查這個(gè)訂單的狀態(tài),如果還是未付款就取消訂單釋放庫(kù)存。
DelayLevel
Message msg = new Message("TopicTest", ("延遲消息").getBytes());
// 給這個(gè)消息設(shè)定一個(gè)延遲等級(jí)
// messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3);
// 發(fā)送單向消息
producer.send(msg);
注意
RocketMQ支持以下幾個(gè)固定的延時(shí)等級(jí),等級(jí)1就對(duì)應(yīng)1s,以此類推,最高支持2h延遲
private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
投遞等級(jí)(delay level) | 延遲時(shí)間 | 投遞等級(jí)(delay level) | 延遲時(shí)間 |
---|---|---|---|
1 | 1s | 10 | 6min |
2 | 5s | 11 | 7min |
3 | 10s | 12 | 8min |
4 | 30s | 13 | 9min |
5 | 1min | 14 | 10min |
6 | 2min | 15 | 20min |
7 | 3min | 16 | 30min |
8 | 4min | 17 | 1h |
9 | 5min | 18 | 2h |
修改延時(shí)級(jí)別
RocketMQ的延遲等級(jí)可以進(jìn)行修改,以滿足自己的業(yè)務(wù)需求,可以修改/添加新的level。具體見(jiàn)[該文章](RocketMQ進(jìn)階-延時(shí)消息 - 知乎 (zhihu.com))
同時(shí)5.0支持使用時(shí)間戳來(lái)設(shè)置延遲時(shí)間定時(shí)/延時(shí)消息 | RocketMQ (apache.org)
發(fā)送批量消息
可以一次性發(fā)送一組消息,那么這一組消息會(huì)被當(dāng)做一個(gè)消息消費(fèi)
List<Message> msgs = Arrays.asList(
new Message("TopicTest", "我是一組消息的A消息".getBytes()),
new Message("TopicTest", "我是一組消息的B消息".getBytes()),
new Message("TopicTest", "我是一組消息的C消息".getBytes())
);
SendResult send = producer.send(msgs);
發(fā)送順序消息
順序消息是一種特殊類型的消息,可以保證按照發(fā)送的順序進(jìn)行消費(fèi),從而保證了消息的有序性。
在 RocketMQ 中,保證消息順序發(fā)送的關(guān)鍵是要將相關(guān)的消息發(fā)送到同一個(gè)隊(duì)列中,并且消費(fèi)者按照隊(duì)列的順序來(lái)消費(fèi)消息
以下是實(shí)現(xiàn)順序消息的步驟:
- 創(chuàng)建一個(gè)指定順序的 MessageQueueSelector。 在發(fā)送消息時(shí),你需要指定一個(gè) MessageQueueSelector 來(lái)選擇目標(biāo)消息隊(duì)列。該 Selector 將根據(jù)某種規(guī)則將相關(guān)的消息發(fā)送到同一個(gè)隊(duì)列中,保證了消息的順序性。
- 設(shè)置 MessageQueueSelector 選擇消息隊(duì)列的邏輯。 在實(shí)現(xiàn) MessageQueueSelector 接口的 select 方法中,你需要編寫(xiě)邏輯來(lái)選擇目標(biāo)隊(duì)列??梢愿鶕?jù)消息的某個(gè)屬性或者業(yè)務(wù)關(guān)聯(lián)來(lái)確定消息應(yīng)該發(fā)送到哪個(gè)隊(duì)列。
- 發(fā)送消息時(shí)使用 MessageQueueSelector。 在發(fā)送消息時(shí),使用 producer.send(msg, selector, orderId) 方法來(lái)指定消息發(fā)送的隊(duì)列。其中,selector 參數(shù)即為你實(shí)現(xiàn)的 MessageQueueSelector 接口的實(shí)例,orderId 是一個(gè)標(biāo)識(shí)消息順序的參數(shù)。
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 當(dāng)前主題有多少個(gè)隊(duì)列
int queueNumber = mqs.size();
// 這個(gè)arg就是后面?zhèn)魅氲?orderId
Integer id = (Integer) arg;
// 用這個(gè)值去%隊(duì)列的個(gè)數(shù)得到一個(gè)隊(duì)列
int index = id % queueNumber
// 返回選擇的這個(gè)隊(duì)列即可 ,那么相同的orderId 就會(huì)被放在相同的隊(duì)列里 實(shí)現(xiàn)First In, First //Out了
return mqs.get(index);
}
}, orderId);
消費(fèi)者的監(jiān)聽(tīng) MessageListenerOrderly如下
// 注冊(cè)一個(gè)消費(fèi)監(jiān)聽(tīng) MessageListenerOrderly 是順序消費(fèi) 單線程消費(fèi)
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
MessageExt messageExt = msgs.get(0);
System.out.println(new String(messageExt.getBody()));
return ConsumeOrderlyStatus.SUCCESS;
}
});
5.SpringBoot集成RocketMQ
搭建rocketmq-producer(消息生產(chǎn)者)模塊
完整的依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
修改配置文件application.yml
spring:
application:
name: rocketmq-producer
rocketmq:
name-server: ip:9876 # rocketMq的nameServer地址
producer:
group: boot-test-producer-group # 生產(chǎn)者組別
send-message-timeout: 3000 # 消息發(fā)送的超時(shí)時(shí)間
retry-times-when-send-async-failed: 2 # 異步消息發(fā)送失敗重試次數(shù)
max-message-size: 4194304 # 消息的最大長(zhǎng)度
添加測(cè)試類的內(nèi)容:
/**
* 注入rocketMQTemplate,我們使用它來(lái)操作mq
*/
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 測(cè)試發(fā)送簡(jiǎn)單的消息
* @throws Exception
*/
@Test
public void testSimpleMsg() throws Exception {
// boot-test是topic,我是一個(gè)簡(jiǎn)單的消息是消息內(nèi)容
SendResult sendResult = rocketMQTemplate.syncSend("boot-test", "我是一個(gè)簡(jiǎn)單的消息");
// 拿到消息的發(fā)送狀態(tài)
log.info(String.valueOf(sendResult.getSendStatus()));
// 拿到消息的id
log.info(sendResult.getMsgId());
}
執(zhí)行后,可得到:
同理創(chuàng)建消費(fèi)者模塊
修改配置文件application.yml
spring:
application:
name: rocketmq-consumer
rocketmq:
name-server: 47.115.209.249:9876 # rocketMq?nameServer??
添加測(cè)試類的內(nèi)容:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 創(chuàng)建一個(gè)簡(jiǎn)單消息的監(jiān)聽(tīng)
* 1.類上添加注解@Component和@RocketMQMessageListener
* @RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group")
* topic指定消費(fèi)的主題,consumerGroup指定消費(fèi)組,一個(gè)主題可以有多個(gè)消費(fèi)者組,一個(gè)消息可以被多個(gè)不同的組的消費(fèi)者都消費(fèi)
* 2.實(shí)現(xiàn)RocketMQListener接口,注意泛型的使用,可以為具體的類型,如果想拿到消息
* 的其他參數(shù)可以寫(xiě)成MessageExt
*/
@Component
@RocketMQMessageListener(topic = "boot-test", consumerGroup = "boot-test-consumer-group",messageModel = MessageModel.CLUSTERING)
@Slf4j
public class SimpleMsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("接收到消息:{}",s);
}
}
執(zhí)行后,可得到:
添加測(cè)試類的內(nèi)容:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 創(chuàng)建一個(gè)簡(jiǎn)單消息的監(jiān)聽(tīng)
* 1.類上添加注解@Component和@RocketMQMessageListener
* @RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group")
* topic指定消費(fèi)的主題,consumerGroup指定消費(fèi)組,一個(gè)主題可以有多個(gè)消費(fèi)者組,一個(gè)消息可以被多個(gè)不同的組的消費(fèi)者都消費(fèi)
* 2.實(shí)現(xiàn)RocketMQListener接口,注意泛型的使用,可以為具體的類型,如果想拿到消息
* 的其他參數(shù)可以寫(xiě)成MessageExt
*/
@Component
@RocketMQMessageListener(topic = "boot-test", consumerGroup = "boot-test-consumer-group",messageModel = MessageModel.CLUSTERING)
@Slf4j
public class SimpleMsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("接收到消息:{}",s);
}
}
執(zhí)行后,可得到:
[外鏈圖片轉(zhuǎn)存中…(img-K8TtTKkm-1691029883688)]
[外鏈圖片轉(zhuǎn)存中…(img-flI5rjFX-1691029883688)]文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-770303.html
6.最后
從文章整體沒(méi)有涉及太深入的一些機(jī)制和原理的講解,因此僅作為入門學(xué)習(xí)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-770303.html
到了這里,關(guān)于RocketMQ的windos/linux/docker超詳細(xì)安裝及簡(jiǎn)單入門!的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!