?消息中間件在現(xiàn)代分布式系統(tǒng)中起著關(guān)鍵作用,它們提供了一種可靠且高效的方法來(lái)進(jìn)行異步通信和解耦。在這篇博客中,我們將重點(diǎn)介紹 RabbitMQ,一個(gè)廣泛使用的開(kāi)源消息中間件。我們將深入探討 RabbitMQ 的特性、工作原理以及如何在應(yīng)用程序中使用它來(lái)實(shí)現(xiàn)可靠的消息傳遞。
一、RabbitMQ 簡(jiǎn)介
RabbitMQ 是基于 AMQP(高級(jí)消息隊(duì)列協(xié)議)的開(kāi)源消息中間件。它提供了一個(gè)可靠的、靈活的、可擴(kuò)展的消息傳遞機(jī)制,廣泛應(yīng)用于各行各業(yè)。RabbitMQ 的核心思想是生產(chǎn)者將消息發(fā)送到交換機(jī),交換機(jī)根據(jù)路由規(guī)則將消息傳遞給隊(duì)列,然后消費(fèi)者從隊(duì)列中獲取并處理消息。
二、相關(guān)概念
RabbitMQ 是一個(gè)開(kāi)源的消息中間件,它是由 Erlang 語(yǔ)言編寫(xiě)的,并且實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)。作為一種可靠、靈活和可擴(kuò)展的消息傳遞系統(tǒng),RabbitMQ 提供了在應(yīng)用程序之間傳輸數(shù)據(jù)的可靠機(jī)制。
下面是 RabbitMQ 的一些關(guān)鍵特性和概念的詳解:
-
消息隊(duì)列:RabbitMQ 使用消息隊(duì)列來(lái)存儲(chǔ)和傳遞消息。消息隊(duì)列是一種先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu),它將消息暫時(shí)存儲(chǔ)在其中,直到消費(fèi)者準(zhǔn)備好接收并處理它們。
-
生產(chǎn)者和消費(fèi)者:消息的發(fā)送者稱(chēng)為生產(chǎn)者,而消息的接收者稱(chēng)為消費(fèi)者。生產(chǎn)者將消息發(fā)送到隊(duì)列中,而消費(fèi)者從隊(duì)列中獲取消息并進(jìn)行處理。
-
隊(duì)列:隊(duì)列是 RabbitMQ 的核心部分,它是消息的存儲(chǔ)和傳遞載體。生產(chǎn)者將消息發(fā)送到隊(duì)列中,而消費(fèi)者則從隊(duì)列中獲取消息。隊(duì)列可以持久化,這意味著即使 RabbitMQ 服務(wù)器關(guān)閉,消息也不會(huì)丟失。
-
交換機(jī)(Exchange):交換機(jī)是消息的路由器,它將消息從生產(chǎn)者發(fā)送到隊(duì)列。它根據(jù)特定的規(guī)則將消息路由到一個(gè)或多個(gè)隊(duì)列。RabbitMQ 提供了不同類(lèi)型的交換機(jī),包括直連交換機(jī)、主題交換機(jī)、廣播交換機(jī)等。
-
綁定(Binding):綁定將交換機(jī)和隊(duì)列關(guān)聯(lián)起來(lái),以定義消息在交換機(jī)和隊(duì)列之間的路由規(guī)則。綁定規(guī)定了消息應(yīng)該如何從交換機(jī)路由到隊(duì)列。
-
路由鍵(Routing Key):路由鍵是生產(chǎn)者在發(fā)送消息時(shí)與消息一起指定的屬性。交換機(jī)根據(jù)路由鍵來(lái)確定將消息路由到哪個(gè)隊(duì)列。
-
持久化:當(dāng)隊(duì)列或消息被標(biāo)記為持久化時(shí),它們會(huì)被保存到磁盤(pán)上,以防止在 RabbitMQ 重啟后丟失數(shù)據(jù)。
-
發(fā)布-訂閱模式:RabbitMQ 支持發(fā)布-訂閱模式,其中一個(gè)生產(chǎn)者發(fā)送消息到交換機(jī),交換機(jī)將消息廣播給所有與其綁定的隊(duì)列,然后每個(gè)隊(duì)列的消費(fèi)者都可以接收并處理消息。
-
ACK 機(jī)制:消費(fèi)者可以使用 ACK 機(jī)制告知 RabbitMQ 已經(jīng)成功接收和處理了消息。只有在消費(fèi)者明確確認(rèn)之后,RabbitMQ 才會(huì)將消息從隊(duì)列中刪除。
-
消息確認(rèn)和持久化:RabbitMQ 允許生產(chǎn)者在發(fā)送消息時(shí)請(qǐng)求確認(rèn)。如果消息無(wú)法成功路由到隊(duì)列,或者在路由過(guò)程中發(fā)生錯(cuò)誤,RabbitMQ 將通知生產(chǎn)者。此外,消息和隊(duì)列的持久化可以確保即使在 RabbitMQ 重啟后也不會(huì)丟失數(shù)據(jù)。
總之,RabbitMQ 是一個(gè)可靠和靈活的消息中間件,它以消息隊(duì)列作為核心,使用交換機(jī)、隊(duì)列、綁定等概念來(lái)進(jìn)行消息的路由和傳遞。它提供了高度可靠的消息傳遞機(jī)制和豐富的特性,廣泛用于分布式系統(tǒng)、微服務(wù)架構(gòu)、任務(wù)隊(duì)列等場(chǎng)景中,幫助應(yīng)用程序?qū)崿F(xiàn)解耦、異步通信和可靠性保證。
三、RabbitMQ 的工作原理
RabbitMQ 的工作原理可以簡(jiǎn)單地概括為以下幾個(gè)步驟:
1、生產(chǎn)者發(fā)送消息到交換機(jī):
生產(chǎn)者通過(guò)連接到 RabbitMQ,并將消息發(fā)送到預(yù)定義的交換機(jī)。消息可以包含任何結(jié)構(gòu)化數(shù)據(jù),如 JSON、XML 等格式。
String message = "Hello, RabbitMQ!";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
2、交換機(jī)將消息路由到隊(duì)列:
交換機(jī)根據(jù)預(yù)定義的路由規(guī)則將消息路由到一個(gè)或多個(gè)綁定到它上面的隊(duì)列。路由規(guī)則可以根據(jù)完全匹配、模式匹配等方式進(jìn)行。
channel.queueBind(queueName, exchangeName, routingKey);
3、消費(fèi)者從隊(duì)列中獲取消息:
消費(fèi)者通過(guò)連接到 RabbitMQ,并訂閱感興趣的隊(duì)列。一旦有消息到達(dá)隊(duì)列,RabbitMQ 將立即將該消息推送給訂閱的消費(fèi)者進(jìn)行處理。
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
});
4、消費(fèi)者處理消息并發(fā)送確認(rèn):
消費(fèi)者獲取到消息后,可以進(jìn)行相應(yīng)的業(yè)務(wù)邏輯處理。一旦消息被成功處理,消費(fèi)者將發(fā)送確認(rèn)給 RabbitMQ,告知消息已經(jīng)被消費(fèi),RabbitMQ 可以安全刪除該消息。
生產(chǎn)者將消息發(fā)送到名為 "my_exchange" 的交換機(jī),并通過(guò)路由鍵 "my_routing_key" 將消息路由到名為 "my_queue" 的隊(duì)列。消費(fèi)者訂閱了 "my_queue" 隊(duì)列,并在收到消息時(shí)調(diào)用回調(diào)函數(shù)進(jìn)行處理。
四、RabbitMQ 的特性
RabbitMQ 提供了許多強(qiáng)大的特性,使其成為一個(gè)廣泛使用的消息中間件:
- 持久化:RabbitMQ 可以將消息和隊(duì)列持久化到磁盤(pán),即使在服務(wù)器重啟后也不會(huì)丟失消息。
- 靈活的路由規(guī)則:通過(guò)使用不同的交換機(jī)類(lèi)型和路由鍵,可以實(shí)現(xiàn)精確的消息路由策略。
- 可靠性和可恢復(fù)性:RabbitMQ 提供了多種保證消息可靠傳遞的機(jī)制,如消息確認(rèn)(acknowledgement)、事務(wù)、發(fā)布者確認(rèn)等。
- 可擴(kuò)展性:RabbitMQ 支持分布式部署和集群模式,可以實(shí)現(xiàn)高吞吐量和高可用性。
- 多語(yǔ)言客戶(hù)端:RabbitMQ 提供了多種官方支持的客戶(hù)端庫(kù),如 Java、Python、Ruby 等,方便開(kāi)發(fā)者在不同的語(yǔ)言環(huán)境下使用。
五、RabbitMQ 在實(shí)際應(yīng)用中的應(yīng)用場(chǎng)景
參考文章:
【RabbitMQ】什么是RabbitMQ?RabbitMQ有什么用?應(yīng)用場(chǎng)景有那些?_路遙葉子的博客-CSDN博客
RabbitMQ 在各種場(chǎng)景中都有廣泛的應(yīng)用,包括但不限于:
- 異步任務(wù)處理:將需要耗時(shí)較長(zhǎng)的任務(wù)發(fā)送到 RabbitMQ,并由消費(fèi)者異步執(zhí)行,以提高系統(tǒng)的響應(yīng)性能和可伸縮性。
- 事件驅(qū)動(dòng)架構(gòu):通過(guò)使用 RabbitMQ 來(lái)實(shí)現(xiàn)事件的發(fā)布與訂閱,不同的組件可以通過(guò)訂閱感興趣的事件來(lái)解耦。
- 應(yīng)用解耦與流量控制:通過(guò)引入消息中間件,不同的應(yīng)用程序可以進(jìn)行解耦,并實(shí)現(xiàn)流量控制、服務(wù)降級(jí)等機(jī)制。
- 日志收集、分析:使用 RabbitMQ 將分布式系統(tǒng)中的日志消息發(fā)送到中央日志服務(wù)器進(jìn)行集中管理和分析。
?5.1??服務(wù)間解耦
用戶(hù)訂單,庫(kù)存處理?!痉?wù)間解耦】
使用MQ前:
系統(tǒng)正常時(shí),用戶(hù)下單,訂單系統(tǒng)調(diào)用庫(kù)存系統(tǒng)進(jìn)行刪減操作,操作成功,將成返回消息,提醒下單成功。系統(tǒng)異常時(shí),庫(kù)存系統(tǒng)將無(wú)法訪問(wèn),導(dǎo)致訂單刪減操作無(wú)法執(zhí)行,最終導(dǎo)致下單失敗。
使用MQ后:
訂單系統(tǒng)和庫(kù)存系統(tǒng)之間不在互相影響,獨(dú)立運(yùn)行,達(dá)到了應(yīng)用解耦的目的。訂單系統(tǒng)只需要將下單消息寫(xiě)入MQ,就可以直接執(zhí)行下一步操作。這時(shí)即使庫(kù)存系統(tǒng)出現(xiàn)異常也不會(huì)影響訂單系統(tǒng)的操作,且下單的庫(kù)存刪減記錄,將會(huì)被永久保存到MQ中,直到庫(kù)存系統(tǒng)恢復(fù)正常,從MQ中訂閱下單消息,進(jìn)行消費(fèi)成功為止。
使用MQ前:
使用MQ后:
5.2?實(shí)現(xiàn)異步通信
用戶(hù)注冊(cè),發(fā)送手機(jī)短信,郵件?!緦?shí)現(xiàn)異步通信】
使用MQ前:
整個(gè)操作流程,全部在主線程完成。點(diǎn)擊用戶(hù)注冊(cè) --》 入庫(kù)添加用戶(hù) --》發(fā)送郵件 --》發(fā)送短信。每一步都需要等待上一步完成后才能執(zhí)行。且每一步操作的響應(yīng)時(shí)間不固定,如果請(qǐng)求過(guò)多,會(huì)導(dǎo)致主線程請(qǐng)求耗時(shí)很長(zhǎng),響應(yīng)慢,甚至?xí)?dǎo)致死機(jī)的情況出現(xiàn),嚴(yán)重影響了用戶(hù)的體驗(yàn)。
使用MQ后:
主線程只需要處理耗時(shí)較低的入庫(kù)操作,然后把需要處理的消息寫(xiě)進(jìn)MQ消息隊(duì)列中,然后由不同的獨(dú)立的郵件系統(tǒng)和發(fā)短信系統(tǒng),同時(shí)訂閱消息隊(duì)列中的消息進(jìn)行消費(fèi)。這樣通過(guò)消息隊(duì)列作為一個(gè)中間人去保存和傳遞消息,不僅僅耗時(shí)低消耗的資源也很少且單個(gè)服務(wù)器能夠承受的并發(fā)請(qǐng)求將更多。
?
5.3?流量削峰
商品秒殺和搶購(gòu)?!玖髁肯鞣濉?/strong>
流量削峰是消息隊(duì)列中常用的場(chǎng)景 一般在秒殺或團(tuán)購(gòu)活動(dòng)中使用廣泛。
使用MQ前:對(duì)于秒殺、搶購(gòu)活動(dòng),用戶(hù)訪問(wèn)所產(chǎn)生的流量會(huì)很大,甚至?xí)谕粫r(shí)間段出現(xiàn)上萬(wàn)上億條請(qǐng)求,這股瞬間的流量暴漲,我們的應(yīng)用系統(tǒng)配置是無(wú)法承受的,會(huì)導(dǎo)致系統(tǒng)直接崩潰死機(jī)。
例如:A系統(tǒng)平時(shí)每秒請(qǐng)求100個(gè),系統(tǒng)穩(wěn)定運(yùn)行; 但是晚上8點(diǎn)有秒殺活動(dòng) ,每秒并發(fā)增至1萬(wàn)條 ,系統(tǒng)最大處理每秒1000條 于是導(dǎo)致系統(tǒng)崩潰。?
使用MQ后:我們?cè)诖罅坑脩?hù)進(jìn)行秒殺請(qǐng)求時(shí),將那個(gè)巨大的流量請(qǐng)求拒在系統(tǒng)業(yè)務(wù)處理的上層,并將其轉(zhuǎn)移至MQ中,而不是直接涌入我們的接口。在這里MQ消息隊(duì)列起到了緩存作用。
例如:100萬(wàn)用戶(hù)在高峰期,每秒請(qǐng)求5000個(gè),將這5000個(gè)請(qǐng)求寫(xiě)入MQ系統(tǒng)每秒只能處理2000請(qǐng)求,因?yàn)镸ySQL只能處理2000個(gè)請(qǐng)求 ; 系統(tǒng)每秒拉取2000個(gè)請(qǐng)求 不超過(guò)自己的處理能力即可。
使用MQ前:
?使用MQ后:
5.4 其他應(yīng)用場(chǎng)景:
1、訂單處理系統(tǒng):
在一個(gè)電子商務(wù)平臺(tái)中,可以使用 RabbitMQ 來(lái)處理訂單。當(dāng)用戶(hù)下單時(shí),訂單信息被發(fā)布到 RabbitMQ 的交換機(jī)中,然后相關(guān)的消費(fèi)者從隊(duì)列中獲取訂單消息并進(jìn)行處理,如驗(yàn)證訂單、庫(kù)存管理、支付等。
2、日志收集與分發(fā):
假設(shè)有多個(gè)應(yīng)用程序生成日志,并希望將它們集中處理和存儲(chǔ)。每個(gè)應(yīng)用程序可以將日志消息發(fā)布到一個(gè)名為 "log_exchange" 的交換機(jī)中,然后有不同的消費(fèi)者訂閱該交換機(jī),并將日志消息寫(xiě)入數(shù)據(jù)庫(kù)或發(fā)送到日志分析系統(tǒng)。
3、實(shí)時(shí)數(shù)據(jù)傳輸:
如果有一個(gè)實(shí)時(shí)監(jiān)控系統(tǒng),需要將傳感器數(shù)據(jù)實(shí)時(shí)傳輸?shù)奖O(jiān)控平臺(tái)進(jìn)行處理和可視化展示。傳感器將數(shù)據(jù)發(fā)布到 RabbitMQ 的交換機(jī)中,監(jiān)控平臺(tái)的消費(fèi)者訂閱交換機(jī)并處理數(shù)據(jù),從而實(shí)現(xiàn)實(shí)時(shí)監(jiān)控和報(bào)警功能。
4、異步任務(wù)處理:
假設(shè)有一個(gè)應(yīng)用程序需要處理大量耗時(shí)的任務(wù),如圖像處理、PDF 轉(zhuǎn)換等。應(yīng)用程序?qū)⑷蝿?wù)發(fā)布到 RabbitMQ 的隊(duì)列中,然后有多個(gè)工作節(jié)點(diǎn)作為消費(fèi)者從隊(duì)列中獲取任務(wù)并進(jìn)行處理,以實(shí)現(xiàn)任務(wù)的并行處理和減輕主應(yīng)用程序的壓力。
5、消息通知系統(tǒng):
?假設(shè)在一個(gè)訂閱系統(tǒng)中,用戶(hù)可以訂閱不同的主題或事件。當(dāng)有新的消息發(fā)布時(shí),RabbitMQ 會(huì)將消息路由到對(duì)應(yīng)的隊(duì)列,然后訂閱該隊(duì)列的用戶(hù)會(huì)收到相應(yīng)的通知。這種方式可以用于實(shí)現(xiàn)郵件訂閱、新聞推送等功能。
6、微服務(wù)架構(gòu):
在一個(gè)微服務(wù)架構(gòu)中,不同的服務(wù)之間可能需要進(jìn)行消息傳遞和協(xié)作。使用 RabbitMQ 可以實(shí)現(xiàn)服務(wù)之間的解耦和異步通信,每個(gè)服務(wù)通過(guò)交換機(jī)和隊(duì)列收發(fā)消息,從而實(shí)現(xiàn)微服務(wù)之間的松耦合。
六、?RabbitMQ 安裝
官網(wǎng)地址:
RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ
6.1 brew安裝
brew update #更新一下homebrew
brew install rabbitmq ? #安裝rabbitMQ?
安裝結(jié)果:
==> Caveats
==> rabbitmq
Management Plugin enabled by default at http://localhost:15672To restart rabbitmq after an upgrade:
? brew services restart rabbitmq
Or, if you don't want/need a background service you can just run:
? CONF_ENV_FILE="/opt/homebrew/etc/rabbitmq/rabbitmq-env.conf" /opt/homebrew/opt/rabbitmq/sbin/rabbitmq-server
?rabbitmq 的安裝路徑:
/opt/homebrew/opt/rabbitmq
6.2 、配置環(huán)境變量
1、
vi ~/.bash_profile
2、?
export RABBIT_HOME=${PATH}:/opt/homebrew/opt/rabbitmq
export PATH=${PATH}:$RABBIT_HOME/sbin
3、?
source ~/.bash_profile
6.3 、啟動(dòng)RabbitMQ
1、前臺(tái)運(yùn)行
rabbitmq-server
2、后臺(tái)運(yùn)行
rabbitmq-server -detached
3、查看運(yùn)行狀態(tài)
rabbitmqctl status
4、開(kāi)始 Web插件
rabbitmq-plugins enable rabbitmq_management
5、重啟
rabbitmq-server restart
5、關(guān)閉
rabbitmqctl stop
6.4、訪問(wèn)MQ
1、瀏覽器地址
http://localhost:15672/
默認(rèn)用戶(hù)名和密碼為guest
添加用戶(hù)
rabbitmqctl add_user miaojiang 123
設(shè)置用戶(hù)為管理員
rabbitmqctl set_user_tags miaojiang administrator
配置用戶(hù)可以遠(yuǎn)程登錄
rabbitmqctl set_permissions -p "/" miaojaing ".*" ".*" ".*"
查看新添加的賬戶(hù)
rabbitmqctl list_users
查看用于的權(quán)限
rabbitmqctl list_permissions -p /
七、Spring Boot 項(xiàng)目應(yīng)用RabbitMQ
7.1、添加Maven依賴(lài):
在你的項(xiàng)目的pom.xml文件中添加RabbitMQ客戶(hù)端庫(kù)的依賴(lài)
<!--AMQP依賴(lài),包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
7.2、配置RabbitMQ連接:
在Spring Boot的配置文件(application.properties 或 application.yml)中添加RabbitMQ的連接信息。
application.properties:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
在application.yml配置mq的參數(shù):
spring:
rabbitmq:
#設(shè)置RabbitMQ的IP地址
host: localhost
#設(shè)置rabbitmq服務(wù)器用戶(hù)名
username: guest
#設(shè)置rabbitmq服務(wù)器密碼
password: guest
#設(shè)置rabbitmq服務(wù)器連接端口
port: 5672
7.3?創(chuàng)建交換機(jī)
自定義交換機(jī)名稱(chēng)
創(chuàng)建名為“myExchange”的交換機(jī)
package com.example.usermanagement.mq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
/*
使用 @Configuration 注解創(chuàng)建一個(gè)配置類(lèi),并通過(guò) @Bean 注解創(chuàng)建了一個(gè)名為 declareExchange 的方法,用于聲明創(chuàng)建交換機(jī)。請(qǐng)根據(jù)實(shí)際情況修改交換機(jī)名稱(chēng)、類(lèi)型和持久化設(shè)置。
*/
public static final String EXCHANGE_NAME = "myExchange";
@Bean
public Exchange declareExchange() {
return ExchangeBuilder.directExchange(EXCHANGE_NAME)
.durable(true)
.build();
}
}
7.4?創(chuàng)建消息發(fā)送者
創(chuàng)建消息發(fā)送者:創(chuàng)建一個(gè)消息發(fā)送者的類(lèi),用于發(fā)送消息到RabbitMQ
package com.example.usermanagement.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageSender{
private final AmqpTemplate amqpTemplate;
private final String exchangeName = "myExchange"; // 自定義交換機(jī)名稱(chēng)
@Autowired
public MessageSender(AmqpTemplate amqpTemplate) {
this.amqpTemplate = amqpTemplate;
}
public void sendMessage(Object message) {
amqpTemplate.convertAndSend(exchangeName, "", message); // 發(fā)送消息到默認(rèn)交換機(jī)和空路由鍵
}
}
注意:
?sendMessage 類(lèi)型使用的是Object類(lèi)型
7.5?RabbitMQ管理后臺(tái)添加對(duì)列
步驟:
-
打開(kāi)瀏覽器,輸入RabbitMQ管理后臺(tái)的URL。默認(rèn)情況下,該URL為
http://localhost:15672/
。請(qǐng)確保你的RabbitMQ服務(wù)器正在運(yùn)行,并且端口號(hào)正確。 -
輸入用戶(hù)名和密碼以登錄到RabbitMQ管理后臺(tái)。默認(rèn)情況下,用戶(hù)名為
guest
,密碼也為guest
。如果你修改過(guò)用戶(hù)名和密碼,請(qǐng)使用你的自定義憑據(jù)進(jìn)行登錄。 -
成功登錄后,你將看到RabbitMQ管理后臺(tái)的主界面。在頂部導(dǎo)航欄中,選擇
Queues
選項(xiàng)卡。 -
在
Queues
頁(yè)面上,你將看到已經(jīng)存在的隊(duì)列列表。如果你想要?jiǎng)?chuàng)建一個(gè)新隊(duì)列,請(qǐng)點(diǎn)擊Add a new queue
按鈕。 -
在添加隊(duì)列的頁(yè)面上,填寫(xiě)以下信息:
-
Name
:隊(duì)列的名稱(chēng)。為隊(duì)列提供一個(gè)唯一的名稱(chēng)。(如myQueue) -
Durability
:隊(duì)列的持久性。選擇是或否,以指定隊(duì)列是否應(yīng)該在RabbitMQ服務(wù)重啟后保留。 -
Auto delete
:隊(duì)列的自動(dòng)刪除。選擇是或否,以指定當(dāng)最后一個(gè)消費(fèi)者斷開(kāi)連接后,是否刪除隊(duì)列。 -
Arguments
:隊(duì)列的其他參數(shù)。這是可選的,你可以為隊(duì)列設(shè)置一些特定的參數(shù)。
-
-
填寫(xiě)完隊(duì)列信息后,點(diǎn)擊
Add queue
按鈕以創(chuàng)建隊(duì)列。 -
創(chuàng)建成功后,你將在
Queues
頁(yè)面上看到新添加的隊(duì)列。你可以在該頁(yè)面上查看隊(duì)列的詳細(xì)信息,包括消息數(shù)量、消費(fèi)者數(shù)量等。
http://localhost:15672/#/queues
只需要添加隊(duì)列名稱(chēng)就可以?
7.6?調(diào)用生產(chǎn)者
1、注入MessageSender
實(shí)例
@Autowired
private MessageSender messageSender;
2、在需要發(fā)送消息的地方調(diào)用messageSender.sendMessage
方法。根據(jù)你的業(yè)務(wù)邏輯,你可以在合適的位置調(diào)用該方法。例如,在訂單創(chuàng)建成功后,你可以添加以下代碼:
messageSender.sendMessage("訂單已創(chuàng)建:" + order.getOrderId());
7.7?創(chuàng)建消息接收者
創(chuàng)建消息接收者:創(chuàng)建一個(gè)消息接收者的類(lèi),用于處理接收到的RabbitMQ消息。
這里就直接寫(xiě)處理RabbitMQ消息的邏輯。
package com.example.usermanagement.mq;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("your_queue_name"),
exchange = @Exchange(value = RabbitMQConfig.EXCHANGE_NAME)
// key = "your_routing_key"
))
public void receiveMessage(Object message) {
System.out.println("Received message: " + message);
// 處理消息邏輯
}
}
注意:
?sendMessage 類(lèi)型使用的是Object類(lèi)型
your_queue_name
替換為你要監(jiān)聽(tīng)的隊(duì)列的名稱(chēng),(如myQueue)
將 your_routing_key
替換為適當(dāng)?shù)穆酚涉I(如果使用)
八、MQ界面介紹
Overview(概覽):提供了一份概覽報(bào)告,包括服務(wù)器和集群信息、節(jié)點(diǎn)狀態(tài)、隊(duì)列和連接摘要、以及最近的相關(guān)日志條目。
Connections(連接):顯示當(dāng)前連接到 RabbitMQ 服務(wù)器的客戶(hù)端應(yīng)用程序,包括連接的名稱(chēng)、協(xié)議、虛擬主機(jī)等信息。
Channels(通道):顯示每個(gè)連接上的活動(dòng)通道,以及與每個(gè)通道相關(guān)的一些指標(biāo),如消費(fèi)者數(shù)量、未確認(rèn)的消息數(shù)量等。
Exchanges(交換機(jī)):列出了所有的交換機(jī),包括名稱(chēng)、類(lèi)型、綁定的隊(duì)列和綁定的數(shù)量。
Queues(隊(duì)列):顯示了所有的隊(duì)列,包括名稱(chēng)、消息數(shù)量、消費(fèi)者數(shù)量等信息。您還可以通過(guò)隊(duì)列進(jìn)行一些操作,如創(chuàng)建、刪除、清空等。
Admin(管理員):提供了一些高級(jí)管理功能,如用戶(hù)和權(quán)限管理、虛擬主機(jī)管理、插件管理等。
8.1?Overview(概覽)
Overview(概覽):提供了一份概覽報(bào)告,包括服務(wù)器和集群信息、節(jié)點(diǎn)狀態(tài)、隊(duì)列和連接摘要、以及最近的相關(guān)日志條目。
?
8.2?Connections(連接)
Connections(連接):顯示當(dāng)前連接到 RabbitMQ 服務(wù)器的客戶(hù)端應(yīng)用程序,包括連接的名稱(chēng)、協(xié)議、虛擬主機(jī)等信息。
8.3?Channels(通道)
Channels(通道):顯示每個(gè)連接上的活動(dòng)通道,以及與每個(gè)通道相關(guān)的一些指標(biāo),如消費(fèi)者數(shù)量、未確認(rèn)的消息數(shù)量等。
8.3.1?prefetch(預(yù)?。?/h4>
在消息隊(duì)列(Message Queue)中,prefetch(預(yù)?。┦且粋€(gè)重要的概念,它用于控制消費(fèi)者從消息隊(duì)列中獲取消息的速度。
Prefetch 是指在消費(fèi)者端從消息隊(duì)列中獲取消息之前,先獲取一定數(shù)量的消息到本地緩存中,以供消費(fèi)者快速處理。這樣可以提高系統(tǒng)的吞吐量和效率,減少網(wǎng)絡(luò)傳輸?shù)拈_(kāi)銷(xiāo)。
具體來(lái)說(shuō),prefetch 可以幫助避免以下情況發(fā)生:
-
公平分發(fā)(Fair dispatching):在多個(gè)消費(fèi)者并行處理消息時(shí),如果沒(méi)有 prefetch 限制,一個(gè)消費(fèi)者可能會(huì)一次性獲取到過(guò)多的消息,導(dǎo)致其他消費(fèi)者得到較少的機(jī)會(huì)。通過(guò)設(shè)置 prefetch,可以確保每個(gè)消費(fèi)者只能獲取一定數(shù)量的消息,實(shí)現(xiàn)更公平的消息分發(fā)。
-
消費(fèi)者負(fù)載均衡(Consumer load balancing):當(dāng)消息隊(duì)列中有大量待處理的消息時(shí),消費(fèi)者可能會(huì)因?yàn)樘幚硭俣容^慢而積壓消息。通過(guò)設(shè)置合適的 prefetch 值,可以限制消費(fèi)者每次獲取的消息數(shù)量,使得消息能夠均勻地分配給多個(gè)消費(fèi)者,從而實(shí)現(xiàn)負(fù)載均衡。
在 RabbitMQ 中,prefetch 的設(shè)置可以通過(guò) basic.qos
方法來(lái)進(jìn)行配置。例如,以下代碼將設(shè)置 prefetch 數(shù)量為 10:
channel.basic_qos(prefetch_count=10)
請(qǐng)注意,prefetch 的設(shè)置應(yīng)該根據(jù)具體的應(yīng)用場(chǎng)景和系統(tǒng)負(fù)載情況進(jìn)行調(diào)優(yōu)。合理設(shè)置 prefetch 可以提高系統(tǒng)的性能和穩(wěn)定性。
8.4 Exchanges(交換機(jī))
Exchanges(交換機(jī)):列出了所有的交換機(jī),包括名稱(chēng)、類(lèi)型、綁定的隊(duì)列和綁定的數(shù)量。
8.5 Queues(隊(duì)列)
Queues(隊(duì)列):顯示了所有的隊(duì)列,包括名稱(chēng)、消息數(shù)量、消費(fèi)者數(shù)量等信息。您還可以通過(guò)隊(duì)列進(jìn)行一些操作,如創(chuàng)建、刪除、清空等。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-656857.html
8.6?Admin(管理員)
Admin(管理員):提供了一些高級(jí)管理功能,如用戶(hù)和權(quán)限管理、虛擬主機(jī)管理、插件管理等文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-656857.html
到了這里,關(guān)于RabbitMQ:可靠消息傳遞的強(qiáng)大消息中間件的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!