SpringBoot 整合 RabbitMQ
概念
2007 年發(fā)布,是一個(gè)在 AMQP(高級(jí)消息隊(duì)列協(xié)議)基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng),是當(dāng)前最主流的消息中間件之一。
RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanced Message Queue 高級(jí)消息隊(duì)列協(xié)議 )的開源實(shí)現(xiàn),由于erlang 語(yǔ)言的高并發(fā)特性,性能較好,本質(zhì)是個(gè)隊(duì)列,F(xiàn)IFO 先入先出,里面存放的內(nèi)容是message。
RabbitMQ 是一個(gè)消息中間件:它接收消息并且轉(zhuǎn)發(fā),就類似于一個(gè)快遞站,賣家把快遞通過(guò)快遞站,送到我們的手上,MQ也是這樣,接收并存儲(chǔ)消息,再轉(zhuǎn)發(fā)。
MQ典型應(yīng)用場(chǎng)景:
- 異步處理。把消息放入消息中間件中,等到需要的時(shí)候再去處理。
- 流量削峰。例如秒殺活動(dòng),在短時(shí)間內(nèi)訪問(wèn)量急劇增加,使用消息隊(duì)列,當(dāng)消息隊(duì)列滿了就拒絕響應(yīng),跳轉(zhuǎn)到錯(cuò)誤頁(yè)面,這樣就可以使得系統(tǒng)不會(huì)因?yàn)槌?fù)載而崩潰。
- 日志處理
- 應(yīng)用解耦。假設(shè)某個(gè)服務(wù)A需要給許多個(gè)服務(wù)(B、C、D)發(fā)送消息,當(dāng)某個(gè)服務(wù)(例如B)不需要發(fā)送消息了,服務(wù)A需要改代碼再次部署;當(dāng)新加入一個(gè)服務(wù)(服務(wù)E)需要服務(wù)A的消息的時(shí)候,也需要改代碼重新部署;另外服務(wù)A也要考慮其他服務(wù)掛掉,沒(méi)有收到消息怎么辦?要不要重新發(fā)送呢?是不是很麻煩,使用MQ發(fā)布訂閱模式,服務(wù)A只生產(chǎn)消息發(fā)送到MQ,B、C、D從MQ中讀取消息,需要A的消息就訂閱,不需要了就取消訂閱,服務(wù)A不再操心其他的事情,使用這種方式可以降低服務(wù)或者系統(tǒng)之間的耦合。
首先先介紹一個(gè)簡(jiǎn)單的一個(gè)消息推送到接收的流程,提供一個(gè)簡(jiǎn)單的圖:
黃色的圈圈就是我們的消息推送服務(wù),將消息推送到 中間方框里面也就是 rabbitMq的服務(wù)器,然后經(jīng)過(guò)服務(wù)器里面的交換機(jī)、隊(duì)列等各種關(guān)系(后面會(huì)詳細(xì)講)將數(shù)據(jù)處理入列后,最終右邊的藍(lán)色圈圈消費(fèi)者獲取對(duì)應(yīng)監(jiān)聽(tīng)的消息。
常用的交換機(jī)有以下三種,因?yàn)橄M(fèi)者是從隊(duì)列獲取信息的,隊(duì)列是綁定交換機(jī)的(一般),所以對(duì)應(yīng)的消息推送/接收模式也會(huì)有以下幾種:
Direct Exchange
直連型交換機(jī),根據(jù)消息攜帶的路由鍵將消息投遞給對(duì)應(yīng)隊(duì)列。
大致流程,有一個(gè)隊(duì)列綁定到一個(gè)直連交換機(jī)上,同時(shí)賦予一個(gè)路由鍵 routing key 。
然后當(dāng)一個(gè)消息攜帶著路由值為X,這個(gè)消息通過(guò)生產(chǎn)者發(fā)送給交換機(jī)時(shí),交換機(jī)就會(huì)根據(jù)這個(gè)路由值X去尋找綁定值也是X的隊(duì)列。
Fanout Exchange
廣播式交換機(jī),這個(gè)交換機(jī)沒(méi)有路由鍵概念,就算你綁了路由鍵也是無(wú)視的。 這個(gè)交換機(jī)在接收到消息后,會(huì)直接轉(zhuǎn)發(fā)到綁定到它上面的所有隊(duì)列。
Topic Exchange
主題交換機(jī),這個(gè)交換機(jī)其實(shí)跟直連交換機(jī)流程差不多,但是它的特點(diǎn)就是在它的路由鍵和綁定鍵之間是有規(guī)則的。
其路由規(guī)則為:routingkey必須為單詞列表,單詞之間以點(diǎn)號(hào)分隔開,*號(hào)代表一個(gè)單詞,#號(hào)可以替代零個(gè)或多個(gè)單詞。
舉例:
路由*.*.key1綁定隊(duì)列Q1,如:red.blue.key1、black.green.key1等,通過(guò)這些路由都可以發(fā)送消息給隊(duì)列Q1
路由#.key2.*綁定隊(duì)列Q2,如:a.b.c.key2.red、orange.key2.blue、key2.red等,通過(guò)這些路由都可以發(fā)送消息給隊(duì)列Q2
路由key3.#綁定隊(duì)列Q2,如:key3.red.blue、key3、key3.green、key3.black.blue.red等,通過(guò)這些路由都可以將消息發(fā)送給Q2
主流的消息隊(duì)列對(duì)比
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
單機(jī)吞吐量 | 萬(wàn)級(jí),比 RocketMQ、Kafka 低一個(gè)數(shù)量級(jí) | 同 ActiveMQ | 10 萬(wàn)級(jí),支撐高吞吐 | 10 萬(wàn)級(jí),高吞吐,一般配合大數(shù)據(jù)類的系統(tǒng)來(lái)進(jìn)行實(shí)時(shí)數(shù)據(jù)計(jì)算、日志采集等場(chǎng)景 |
topic 數(shù)量對(duì)吞吐量的影響 | topic 可以達(dá)到幾百/幾千的級(jí)別,吞吐量會(huì)有較小幅度的下降,這是 RocketMQ 的一大優(yōu)勢(shì),在同等機(jī)器下,可以支撐大量的 topic | topic 從幾十到幾百個(gè)時(shí)候,吞吐量會(huì)大幅度下降,在同等機(jī)器下,Kafka 盡量保證 topic 數(shù)量不要過(guò)多,如果要支撐大規(guī)模的 topic,需要增加更多的機(jī)器資源 | ||
時(shí)效性 | ms 級(jí) | 微秒級(jí),這是 RabbitMQ 的一大特點(diǎn),延遲最低 | ms 級(jí) | 延遲在 ms 級(jí)以內(nèi) |
可用性 | 高,基于主從架構(gòu)實(shí)現(xiàn)高可用 | 同 ActiveMQ | 非常高,分布式架構(gòu) | 非常高,分布式,一個(gè)數(shù)據(jù)多個(gè)副本,少數(shù)機(jī)器宕機(jī),不會(huì)丟失數(shù)據(jù),不會(huì)導(dǎo)致不可用 |
消息可靠性 | 有較低的概率丟失數(shù)據(jù) | 基本不丟 | 經(jīng)過(guò)參數(shù)優(yōu)化配置,可以做到 0 丟失 | 同 RocketMQ |
功能支持 | MQ 領(lǐng)域的功能極其完備 | 基于 erlang 開發(fā),并發(fā)能力很強(qiáng),性能極好,延時(shí)很低 | MQ 功能較為完善,還是分布式的,擴(kuò)展性好 | 功能較為簡(jiǎn)單,主要支持簡(jiǎn)單的 MQ 功能,在大數(shù)據(jù)領(lǐng)域的實(shí)時(shí)計(jì)算以及日志采集被大規(guī)模使用 |
SpringBoot整合RabbitMQ
? RabbitMQ是MQ產(chǎn)品中的目前較為流行的產(chǎn)品之一,它遵從AMQP協(xié)議。RabbitMQ的底層實(shí)現(xiàn)語(yǔ)言使用的是Erlang,所以安裝RabbitMQ需要先安裝Erlang。
Erlang安裝
? windows版安裝包下載地址:https??/www.erlang.org/downloads
? 下載完畢后得到exe安裝文件,一鍵傻瓜式安裝,安裝完畢需要重啟,需要重啟,需要重啟。
? 安裝的過(guò)程中可能會(huì)出現(xiàn)依賴Windows組件的提示,根據(jù)提示下載安裝即可,都是自動(dòng)執(zhí)行的,如下:
? Erlang安裝后需要配置環(huán)境變量,否則RabbitMQ將無(wú)法找到安裝的Erlang。需要配置項(xiàng)如下,作用等同JDK配置環(huán)境變量的作用。
- ERLANG_HOME
- PATH
安裝 rabbitMQ
? csdn 安裝教程:https://blog.csdn.net/tirster/article/details/121938987
? windows版安裝包下載地址:https://rabbitmq.com/install-windows.html
? 下載完畢后得到exe安裝文件,一鍵傻瓜式安裝,安裝完畢后會(huì)得到如下文件
啟動(dòng)服務(wù)器
rabbitmq-service.bat start # 啟動(dòng)服務(wù)
rabbitmq-service.bat stop # 停止服務(wù)
rabbitmqctl status # 查看服務(wù)狀態(tài)
? 運(yùn)行sbin目錄下的rabbitmq-service.bat命令即可,start參數(shù)表示啟動(dòng),stop參數(shù)表示退出,默認(rèn)對(duì)外服務(wù)端口15672。
? 注意:?jiǎn)?dòng)rabbitmq的過(guò)程實(shí)際上是開啟rabbitmq對(duì)應(yīng)的系統(tǒng)服務(wù),需要管理員權(quán)限方可執(zhí)行。
? 說(shuō)明:有沒(méi)有感覺(jué)15672的服務(wù)端口很熟悉?activemq與rabbitmq有一個(gè)端口沖突問(wèn)題,學(xué)習(xí)階段無(wú)論操作哪一個(gè)?請(qǐng)確保另一個(gè)處于關(guān)閉狀態(tài)。
? 說(shuō)明:不喜歡命令行的小伙伴可以使用任務(wù)管理器中的服務(wù)頁(yè),找到RabbitMQ服務(wù),使用鼠標(biāo)右鍵菜單控制服務(wù)的啟停。
訪問(wèn)web管理服務(wù)
? RabbitMQ也提供有web控制臺(tái)服務(wù),但是此功能是一個(gè)插件,需要先啟用才可以使用。
rabbitmq-plugins.bat list # 查看當(dāng)前所有插件的運(yùn)行狀態(tài)
rabbitmq-plugins.bat enable rabbitmq_management # 啟動(dòng)rabbitmq_management插件
? 啟動(dòng)插件后可以在插件運(yùn)行狀態(tài)中查看是否運(yùn)行,運(yùn)行后通過(guò)瀏覽器即可打開服務(wù)后臺(tái)管理界面
http://localhost:15672
? web管理服務(wù)默認(rèn)端口15672,訪問(wèn)后可以打開RabbitMQ的管理界面,如下:
[圖片加載失敗]
? 首先輸入訪問(wèn)用戶名和密碼,初始化用戶名和密碼相同,均為:guest,成功登錄后進(jìn)入管理后臺(tái)界面,如下:
[圖片加載失敗]
整合(direct模型)
? RabbitMQ滿足AMQP協(xié)議,因此不同的消息模型對(duì)應(yīng)的制作不同,先使用最簡(jiǎn)單的direct模型開發(fā)。
步驟①:導(dǎo)入springboot整合amqp的starter,amqp協(xié)議默認(rèn)實(shí)現(xiàn)為rabbitmq方案
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
步驟②:配置RabbitMQ的服務(wù)器地址
spring:
rabbitmq:
host: localhost
port: 5672
步驟③:初始化直連模式系統(tǒng)設(shè)置
? 由于RabbitMQ不同模型要使用不同的交換機(jī),因此需要先初始化RabbitMQ相關(guān)的對(duì)象,例如隊(duì)列,交換機(jī)等
@Configuration
public class RabbitConfigDirect {
// 聲明隊(duì)列
@Bean
public Queue directQueue(){
return new Queue("direct_queue");
}
@Bean
public Queue directQueue2(){
return new Queue("direct_queue2");
}
// 聲明交換機(jī)
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
// 聲明交換機(jī)和隊(duì)列的綁定關(guān)系
@Bean
public Binding bindingDirect(){
return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
}
@Bean
public Binding bindingDirect2(){
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("direct2");
}
}
? 隊(duì)列Queue與直連交換機(jī)DirectExchange創(chuàng)建后,還需要綁定他們之間的關(guān)系Binding,這樣就可以通過(guò)交換機(jī)操作對(duì)應(yīng)隊(duì)列。
步驟④:使用AmqpTemplate操作RabbitMQ
@Service
public class MessageServiceRabbitmqDirectImpl implements MessageService {
@Autowired
private AmqpTemplate amqpTemplate;
@Override
public void sendMessage(String id) {
System.out.println("待發(fā)送短信的訂單已納入處理隊(duì)列(rabbitmq direct),id:"+id);
amqpTemplate.convertAndSend("directExchange","direct",id);
}
}
? amqp協(xié)議中的操作API接口名稱看上去和jms規(guī)范的操作API接口很相似,但是傳遞參數(shù)差異很大。
步驟⑤:使用消息監(jiān)聽(tīng)器在服務(wù)器啟動(dòng)后,監(jiān)聽(tīng)指定位置,當(dāng)消息出現(xiàn)后,立即消費(fèi)消息
@Component
public class MessageListener {
@RabbitListener(queues = "direct_queue")
public void receive(String id){
System.out.println("已完成短信發(fā)送業(yè)務(wù)(rabbitmq direct),id:"+id);
}
}
? 使用注解@RabbitListener定義當(dāng)前方法監(jiān)聽(tīng)RabbitMQ中指定名稱的消息隊(duì)列。
整合(topic模型)
步驟①:同上
步驟②:同上
步驟③:初始化主題模式系統(tǒng)設(shè)置
@Configuration
public class RabbitConfigTopic {
@Bean
public Queue topicQueue(){
return new Queue("topic_queue");
}
@Bean
public Queue topicQueue2(){
return new Queue("topic_queue2");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
@Bean
public Binding bindingTopic(){
return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.*.id");
}
@Bean
public Binding bindingTopic2(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.orders.*");
}
}
? 主題模式支持routingKey匹配模式,*表示匹配一個(gè)單詞,#表示匹配任意內(nèi)容,這樣就可以通過(guò)主題交換機(jī)將消息分發(fā)到不同的隊(duì)列中,詳細(xì)內(nèi)容請(qǐng)參看RabbitMQ系列課程。
匹配鍵 | topic.*.* | topic.# |
---|---|---|
topic.order.id | true | true |
order.topic.id | false | false |
topic.sm.order.id | false | true |
topic.sm.id | false | true |
topic.id.order | true | true |
topic.id | false | true |
topic.order | false | true |
步驟④:使用AmqpTemplate操作RabbitMQ
@Service
public class MessageServiceRabbitmqTopicImpl implements MessageService {
@Autowired
private AmqpTemplate amqpTemplate;
@Override
public void sendMessage(String id) {
System.out.println("待發(fā)送短信的訂單已納入處理隊(duì)列(rabbitmq topic),id:"+id);
amqpTemplate.convertAndSend("topicExchange","topic.orders.id",id);
}
}
? 發(fā)送消息后,根據(jù)當(dāng)前提供的routingKey與綁定交換機(jī)時(shí)設(shè)定的routingKey進(jìn)行匹配,規(guī)則匹配成功消息才會(huì)進(jìn)入到對(duì)應(yīng)的隊(duì)列中。
步驟⑤:使用消息監(jiān)聽(tīng)器在服務(wù)器啟動(dòng)后,監(jiān)聽(tīng)指定隊(duì)列
@Component
public class MessageListener {
@RabbitListener(queues = "topic_queue")
public void receive(String id){
System.out.println("已完成短信發(fā)送業(yè)務(wù)(rabbitmq topic 1),id:"+id);
}
@RabbitListener(queues = "topic_queue2")
public void receive2(String id){
System.out.println("已完成短信發(fā)送業(yè)務(wù)(rabbitmq topic 22222222),id:"+id);
}
}
? 使用注解@RabbitListener定義當(dāng)前方法監(jiān)聽(tīng)RabbitMQ中指定名稱的消息隊(duì)列。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-552893.html
總結(jié)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-552893.html
- springboot整合RabbitMQ提供了AmqpTemplate對(duì)象作為客戶端操作消息隊(duì)列
- 操作ActiveMQ需要配置ActiveMQ服務(wù)器地址,默認(rèn)端口5672
- 企業(yè)開發(fā)時(shí)通常使用監(jiān)聽(tīng)器來(lái)處理消息隊(duì)列中的消息,設(shè)置監(jiān)聽(tīng)器使用注解@RabbitListener
- RabbitMQ有5種消息模型,使用的隊(duì)列相同,但是交換機(jī)不同。交換機(jī)不同,對(duì)應(yīng)的消息進(jìn)入的策略也不同
到了這里,關(guān)于SpringBoot 整合RabbitMQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!