目錄
1.MQ的相關概念
1.1 什么是MQ消息中間件
1.2 為什么使用MQ
(1) 應用解耦
(2) 異步提速 ?
(3)削峰填谷
1.3 使用MQ的劣勢
1.4 常見的MQ組件???????
2. RabbitMQ的概述
2.1 RabbitMQ的概念
2.2 RabbitMQ的原理
2.3 安裝RabbitMQ
3. RabbitMQ 的工作模式
3.1 simple (簡單模式)
3.2?Work queues(工作模式)
3.3.Publish/Subscribe(發(fā)布訂閱模式)
3.4.Routing(路由模式)
3.5.Topics(主題模式)
4.springboot整合RabbitMQ
4.1.生產方
4.2.消費方
4.3.通過代碼創(chuàng)建交換機和隊列
1.MQ的相關概念
1.1 什么是MQ消息中間件
MQ全稱 Message Queue(消息隊列),是在消息的傳輸過程中保存消息的容器。它是應用程序和應用程序之間的通信方法。
1.2 為什么使用MQ
在項目中,可將一些無需即時返回且耗時的操作提取出來,進行異步處理,而這種異步處理的方式大大的節(jié)省了服務器的請求響應時間,從而提高了系統(tǒng)的吞吐量。
MQ總結為三個好處:
(1) 應用解耦
以電商應用為例,應用中有訂單系統(tǒng)、庫存系統(tǒng)、物流系統(tǒng)、支付系統(tǒng)。用戶創(chuàng)建訂單后,如果耦合調用庫存系統(tǒng)、物流系統(tǒng)、支付系統(tǒng),任何一個子系統(tǒng)出了故障,都會造成下單操作異常。當轉變成基于消息隊列的方式后,系統(tǒng)間調用的問題會減少很多,比如物流系統(tǒng)因為發(fā)生故障,需要幾分鐘來修復。在這幾分鐘的時間里,物流系統(tǒng)要處理的內容被緩存在消息隊列中,用戶的下單操作可以正常完成。當物流系統(tǒng)恢復后,繼續(xù)處理訂單信息即可,中間用戶感受不到物流系統(tǒng)的故障,提升系統(tǒng)的可用性。
?
(2) 異步提速 ?
上面要完成下單需要花費的時間: 20 + 300 + 300 + 300 = 920ms 用戶點擊完下單按鈕后,需要等待920ms才能得到下單響應,太慢!
使用MQ可以解決上述問題
?用戶點擊完下單按鈕后,只需等待25ms就能得到下單響應 (20 + 5 = 25ms)。 提升用戶體驗和系統(tǒng)吞吐量(單位時間內處理請求的數(shù)目)
(3)削峰填谷
舉個例子,如果訂單系統(tǒng)最多能處理一千次訂單,這個處理能力應付正常時段的下單時綽綽有余,正常時段我們下單一秒后就能返回結果。但是在高峰期,如果有兩千次下單操作系統(tǒng)是處理不了的,只能限制訂單超過一千后不允許用戶下單。使用消息隊列做緩沖,我們可以取消這個限制,把一秒內下的訂單分散成一段時間來處理,這時有些用戶可能在下單十幾秒后才能收到下單成功的操作,但是比不能下單的體驗要好。 簡單來說: 就是在訪問量劇增的情況下,但是應用仍然不能停,比如“雙十一”下單的人多,但是淘寶這個應用仍然要運行,所以就可以使用消息中間件采用隊列的形式減少突然訪問的壓力
使用了 MQ 之后,限制消費消息的速度為1000,這樣一來,高峰期產生的數(shù)據(jù)勢必會被積壓在 MQ 中,高峰就被“削”掉了,但是因為消息積壓,在高峰期過后的一段時間內,消費消息的速度還是會維持在1000,直到消費完積壓的消息,這就叫做“填谷”。
使用MQ后,可以提高系統(tǒng)穩(wěn)定性
1.3 使用MQ的劣勢
- 系統(tǒng)可用性降低 系統(tǒng)引入的外部依賴越多,系統(tǒng)穩(wěn)定性越差。一旦 MQ 宕機,就會對業(yè)務造成影響。如何保證MQ的高可用?
- 系統(tǒng)復雜度提高 MQ 的加入大大增加了系統(tǒng)的復雜度,以前系統(tǒng)間是同步的遠程調用,現(xiàn)在是通過 MQ 進行異步調用。如何保證消息沒有被重復消費?怎么處理消息丟失情況?那么保證消息傳遞的順序性?
- 一致性問題 A 系統(tǒng)處理完業(yè)務,通過 MQ 給B、C、D三個系統(tǒng)發(fā)消息數(shù)據(jù),如果 B 系統(tǒng)、C 系統(tǒng)處理成功,D 系統(tǒng)處理失敗。如何保證消息數(shù)據(jù)處理的一致性?
1.4 常見的MQ組件
目前業(yè)界有很多的 MQ 產品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充當消息隊列的案例,而這些消息隊列產品,各有側重,在實際選型時,需要結合自身需求及 MQ 產品特征
2. RabbitMQ的概述
2.1 RabbitMQ的概念
- 2007 年發(fā)布,是一個在 AMQP(高級消息隊列協(xié)議)基礎上完成的,可復用的企業(yè)消息系統(tǒng),是當前最主流的消息中間件之一。
- RabbitMQ是一個由erlang開發(fā)的AMQP(Advanced Message Queue 高級消息隊列協(xié)議 )的開源實現(xiàn),由于erlang 語言的高并發(fā)特性,性能較好,本質是個隊列,F(xiàn)IFO 先入先出,里面存放的內容是message
- ? RabbitMQ是一個消息中間件:它接受并轉發(fā)消息。你可以把它當做一個快遞站點,當你要發(fā)送一個包裹時,你把你的包裹放到快遞站,快遞員最終會把你的快遞送到收件人那里,按照這種邏輯RabbitMQ是一個快遞站,一個快遞員幫你傳遞快件。RabbitMQ與快遞站的主要區(qū)別在于,它不處理快件而是接收,存儲和轉發(fā)消息數(shù)據(jù)。
2.2 RabbitMQ的原理
名詞解釋:
- Broker:接收和分發(fā)消息的應用,RabbitMQ Server就是 Message Broker
- Virtual host:出于多租戶和安全因素設計的,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似于網絡中的 namespace 概念。當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個用戶在自己的 vhost 創(chuàng)建 exchange/queue 等
- Connection:publisher/consumer 和 broker 之間的 TCP 連接
- Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection,在消息量大的時候建立 TCP Connection的開銷將是巨大的,效率也較低。Channel 是在 connection 內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創(chuàng)建單獨的 channel 進行通訊,AMQP method 包含了channel id 幫助客戶端和message broker 識別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級的 Connection 極大減少了操作系統(tǒng)建立 TCP connection 的開銷.
- Exchange:message 到達 broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的 routing key,分發(fā)消息到queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- Queue:消息最終被送到這里等待 consumer 取走
- Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查詢表中,用于 message 的分發(fā)依據(jù)
2.3 安裝RabbitMQ
安裝詳情------>虛擬機安裝RabbitMQ
3. RabbitMQ 的工作模式
RabbitMQ 提供了 6 種工作模式:簡單模式、work queues、Publish/Subscribe 發(fā)布與訂閱模式、Routing 路由模式、Topics 主題模式、RPC 遠程調用模式(遠程調用,不太算 MQ;暫不作介紹)。 官網對應模式介紹:RabbitMQ Tutorials — RabbitMQ
3.1 simple (簡單模式)
在上圖的模型中,有以下概念:
P:生產者,也就是要發(fā)送消息的程序
C:消費者:消息的接收者,會一直等待消息到來
queue:消息隊列,圖中紅色部分。類似一個郵箱,可以緩存消息;生產者向其中投遞消息,消費者從其中取出消息
演示:
創(chuàng)建工程
?加依賴
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.2</version> </dependency> </dependencies>
?生產者:
package com.wqg.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ fileName:SimpleProducer * @ description:簡單模式-生產者 * @ author:wqg * @ createTime:2023/7/12 16:26 */ public class SimpleProducer { public static void main(String[] args) throws Exception { //創(chuàng)建連接工廠對象并設置連接信息 -----獲取連接對象(指定rabbitMQ服務端的信息) ConnectionFactory connectionFactory = new ConnectionFactory(); //rabbitMQ服務端的地址 默認localhost connectionFactory.setHost("192.168.75.129"); //設置端口號 connectionFactory.setPort(5672); //設置賬號 默認guest connectionFactory.setUsername("guest"); //設置密碼 默認guest connectionFactory.setPassword("guest"); //設置虛擬主機 默認/ connectionFactory.setVirtualHost("/"); //獲取連接對象 Connection connection = connectionFactory.newConnection(); //獲取Channel信道對象 Channel channel = connection.createChannel(); //創(chuàng)建一個隊列對象 /** * String queue, 隊列的名稱. 如果該名稱不存在 則創(chuàng)建 如果存在則不創(chuàng)建 * boolean durable, 該對象是否持久化 當rabbitmq重啟后 隊列就會消失 * boolean exclusive, 該隊列是否被一個消費者獨占 * boolean autoDelete,當沒有消費者時,該隊列是否被自動刪除 * Map<String, Object> arguments: 額外參數(shù)的設置 * */ channel.queueDeclare("simple-queue",true,false,false,null); //發(fā)送信息 /** * String exchange, 交換機的名稱 簡單模式沒有交換機使用""表示采用默認交換機 * String routingKey, 路由標識 如果是簡單模式起名為隊列的名稱 * BasicProperties props, 消息的屬性設置。 設置為null * byte[] body: 消息的內容 */ String msg = "Hello RabbitMQ~~~"; channel.basicPublish("","simple-queue",null,msg.getBytes()); //關閉資源 channel.close(); connectionFactory.clone(); } }
消費者:
3.2?Work queues(工作模式)
?Work Queues:與入門程序的簡單模式相比,多了一個或一些消費端,多個消費端共同消費同一個隊列中的消息。
應用場景:對于任務過重或任務較多情況使用工作隊列可以提高任務處理的速度
生產者:
package com.wqg.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ fileName:SimpleProducer * @ description:工作模式-生產者 * @ author:wqg * @ createTime:2023/7/12 16:26 */ public class WorkProducer { public static void main(String[] args) throws Exception { //創(chuàng)建連接工廠對象并設置連接信息 -----獲取連接對象(指定rabbitMQ服務端的信息) ConnectionFactory connectionFactory = new ConnectionFactory(); //rabbitMQ服務端的地址 默認localhost connectionFactory.setHost("192.168.75.129"); //設置端口號 connectionFactory.setPort(5672); //設置賬號 默認guest connectionFactory.setUsername("guest"); //設置密碼 默認guest connectionFactory.setPassword("guest"); //設置虛擬主機 默認/ connectionFactory.setVirtualHost("/"); //獲取連接對象 Connection connection = connectionFactory.newConnection(); //獲取Channel信道對象 Channel channel = connection.createChannel(); //創(chuàng)建一個隊列對象 /** * String queue, 隊列的名稱. 如果該名稱不存在 則創(chuàng)建 如果存在則不創(chuàng)建 * boolean durable, 該對象是否持久化 當rabbitmq重啟后 隊列就會消失 * boolean exclusive, 該隊列是否被一個消費者獨占 * boolean autoDelete,當沒有消費者時,該隊列是否被自動刪除 * Map<String, Object> arguments: 額外參數(shù)的設置 * */ channel.queueDeclare("Work-Queue", true, false, false, null); //發(fā)送信息 /** * String exchange, 交換機的名稱 簡單模式沒有交換機使用""表示采用默認交換機 * String routingKey, 路由標識 如果是簡單模式起名為隊列的名稱 * BasicProperties props, 消息的屬性設置。 設置為null * byte[] body: 消息的內容 */ for (int i=0; i<10; i++){ String msg = "Hello RabbitMQ~~~工作模式"; channel.basicPublish("", "Work-Queue", null, msg.getBytes()); } //關閉資源 channel.close(); connectionFactory.clone(); } }
消費者:
package com.wqg.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * @ fileName:SimpleConsumer * @ description:工作模式-消費者 * @ author:WQG * @ createTime:2023/7/12 16:25 */ public class WorkConsumer01 { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.75.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //channel.queueDeclare("Work-queue", true, false, false, null); //接受隊列中的信息 Consumer consumer = new DefaultConsumer(channel) { /** * * @param consumerTag: 消費者的標簽 * @param envelope : 設置 拿到你的交換機 路由key等信息 * @param properties: 消息的屬性對象 * @param body: 消息的內容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接受的內容===" + new String(body)); } }; /** * String queue, 隊列名 * boolean autoAck,是否自動確認。 當rabbitmq把消息發(fā)送給消費后,消費端自動確認消息。 * Consumer callback:回調。 當rabbitmq隊列中存在消息 則觸發(fā)該回調 */ channel.basicConsume("Work-Queue", true, consumer); } }
總結: 在一個隊列中如果有多個消費者,那么消費者之間對于同一個消息的關系是競爭的關系。
Work Queues 對于任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。
例如:短信服務部署多個,只需要有一個節(jié)點成功發(fā)送即可。
3.3.Publish/Subscribe(發(fā)布訂閱模式)
在訂閱模型中,多了一個 Exchange 角色,而且過程略有變化:
P:生產者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊列中,而是發(fā)給X(交換機)
C:消費者,消息的接收者,會一直等待消息到來
Queue:消息隊列,接收消息、緩存消息
Exchange:交換機(X)。一方面,接收生產者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:
Fanout:廣播,將消息交給所有綁定到交換機的隊列
Direct:定向,把消息交給符合指定routing key 的隊列
Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
Exchange(交換機)只負責轉發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange 綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失!
生產者:
package com.wqg.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ fileName:SimpleProducer * @ description:發(fā)布訂閱模式-生產者 * @ author:wqg * @ createTime:2023/7/12 16:26 */ public class PublishProducer { public static void main(String[] args) throws Exception { //創(chuàng)建連接工廠對象并設置連接信息 -----獲取連接對象(指定rabbitMQ服務端的信息) ConnectionFactory connectionFactory = new ConnectionFactory(); //rabbitMQ服務端的地址 默認localhost connectionFactory.setHost("192.168.75.129"); //設置端口號 connectionFactory.setPort(5672); //設置賬號 默認guest connectionFactory.setUsername("guest"); //設置密碼 默認guest connectionFactory.setPassword("guest"); //設置虛擬主機 默認/ connectionFactory.setVirtualHost("/"); //獲取連接對象 Connection connection = connectionFactory.newConnection(); //獲取Channel信道對象 Channel channel = connection.createChannel(); //創(chuàng)建交換機 /** * String exchange, 交換機的名稱 如果不存在則創(chuàng)建 存在則不創(chuàng)建 * BuiltinExchangeType type, 交換機的類型 * boolean durable: 是否持久化。 */ channel.exchangeDeclare("Publish-exchange", BuiltinExchangeType.FANOUT,true); //創(chuàng)建隊列 channel.queueDeclare("Publish-Queue01", true, false, false, null); channel.queueDeclare("Publish-Queue02", true, false, false, null); //隊列和交換機綁定 /** * String queue, * String exchange, * String routingKey: 發(fā)布訂閱模式 沒有routingKey 則寫為"" */ channel.queueBind("Publish-Queue01","Publish-exchange",""); channel.queueBind("Publish-Queue02","Publish-exchange",""); //發(fā)送信息 String msg = "Hello RabbitMQ~~~發(fā)布訂閱模式"; channel.basicPublish("Publish-exchange","", null, msg.getBytes()); //關閉資源 channel.close(); connectionFactory.clone(); } }
?消費者:
package com.wqg.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * @ fileName:SimpleConsumer * @ description:發(fā)布訂閱模式-消費者 * @ author:WQG * @ createTime:2023/7/12 16:25 */ public class PublishConsumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.75.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //channel.queueDeclare("Work-queue", true, false, false, null); //接受隊列中的信息 Consumer consumer = new DefaultConsumer(channel) { /** * * @param consumerTag: 消費者的標簽 * @param envelope : 設置 拿到你的交換機 路由key等信息 * @param properties: 消息的屬性對象 * @param body: 消息的內容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接受的內容===" + new String(body)); } }; /** * String queue, 隊列名 * boolean autoAck,是否自動確認。 當rabbitmq把消息發(fā)送給消費后,消費端自動確認消息。 * Consumer callback:回調。 當rabbitmq隊列中存在消息 則觸發(fā)該回調 */ //Publish-Queue02和Publish-Queue01只有一個里面有數(shù)據(jù) channel.basicConsume("Publish-Queue02", true,consumer); } }
交換機需要與隊列進行綁定,綁定之后;一個消息可以被多個消費者都收到。
發(fā)布訂閱模式與工作隊列模式的區(qū)別:
- 工作隊列模式不用定義交換機,而發(fā)布/訂閱模式需要定義交換機
- 發(fā)布/訂閱模式的生產方是面向交換機發(fā)送消息,工作隊列模式的生產方是面向隊列發(fā)送消息(底層使用默認交換機)
- 發(fā)布/訂閱模式需要設置隊列和交換機的綁定,工作隊列模式不需要設置,實際上工作隊列模式會將隊列綁 定到默認的交換機
3.4.Routing(路由模式)
隊列與交換機的綁定,不能是任意綁定了,而是要指定一個 RoutingKey(路由key)
消息的發(fā)送方在向 Exchange 發(fā)送消息時,也必須指定消息的 RoutingKey
Exchange 不再把消息交給每一個綁定的隊列,而是根據(jù)消息的 Routing Key 進行判斷,只有隊列的Routingkey 與消息的 Routing key 完全一致,才會接收到消息
P:生產者,向 Exchange 發(fā)送消息,發(fā)送消息時,會指定一個routing key
X:Exchange(交換機),接收生產者的消息,然后把消息遞交給與 routing key 完全匹配的隊列
C1:消費者,其所在隊列指定了需要 routing key 為 error 的消息
C2:消費者,其所在隊列指定了需要 routing key 為 info、error、warning 的消息
生產者:
package com.wqg.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ fileName:SimpleProducer * @ description:路由模式-生產者 * @ author:wqg * @ createTime:2023/7/12 16:26 */ public class RouterProducer { public static void main(String[] args) throws Exception { //創(chuàng)建連接工廠對象并設置連接信息 -----獲取連接對象(指定rabbitMQ服務端的信息) ConnectionFactory connectionFactory = new ConnectionFactory(); //rabbitMQ服務端的地址 默認localhost connectionFactory.setHost("192.168.75.129"); //設置端口號 connectionFactory.setPort(5672); //設置賬號 默認guest connectionFactory.setUsername("guest"); //設置密碼 默認guest connectionFactory.setPassword("guest"); //設置虛擬主機 默認/ connectionFactory.setVirtualHost("/"); //獲取連接對象 Connection connection = connectionFactory.newConnection(); //獲取Channel信道對象 Channel channel = connection.createChannel(); //創(chuàng)建交換機 channel.exchangeDeclare("Router-exchange", BuiltinExchangeType.DIRECT,true); //創(chuàng)建隊列 channel.queueDeclare("Router-queue001",true,false,false,null); channel.queueDeclare("Router-queue002",true,false,false,null); //隊列和交換機綁定 channel.queueBind("Router-queue001","Router-exchange","error"); channel.queueBind("Router-queue002","Router-exchange","error"); channel.queueBind("Router-queue002","Router-exchange","info"); channel.queueBind("Router-queue002","Router-exchange","warning"); String msg = "Hello RabbitMQ~~~路由模式"; channel.basicPublish("Router-exchange","info",null,msg.getBytes()); channel.close(); connectionFactory.clone(); } }
消費者:
package com.wqg.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * @ fileName:SimpleConsumer * @ description:路由模式-消費者 * @ author:WQG * @ createTime:2023/7/12 16:25 */ public class RouterConsumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.75.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //channel.queueDeclare("Work-queue", true, false, false, null); //接受隊列中的信息 Consumer consumer = new DefaultConsumer(channel) { /** * * @param consumerTag: 消費者的標簽 * @param envelope : 設置 拿到你的交換機 路由key等信息 * @param properties: 消息的屬性對象 * @param body: 消息的內容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接受的內容===" + new String(body)); } }; /** * String queue, 隊列名 * boolean autoAck,是否自動確認。 當rabbitmq把消息發(fā)送給消費后,消費端自動確認消息。 * Consumer callback:回調。 當rabbitmq隊列中存在消息 則觸發(fā)該回調 */ channel.basicConsume("Router-queue002", true,consumer); } }
總結:
Routing?模式要求隊列在綁定交換機時要指定?routing key,消息會轉發(fā)到符合 routing key?的隊列。
3.5.Topics(主題模式)
- Topic?類型與?Direct?相比,都是可以根據(jù)?RoutingKey?把消息路由到不同的隊列。只不過 Topic?類型Exchange?可以讓隊列在綁定Routing key 的時候使用通配符!
- Routingkey?一般都是有一個或多個單詞組成,多個單詞之間以”.” 分割,例如: item.insert
- 通配符規(guī)則:#?匹配一個或多個詞,*?匹配不多不少恰好1個詞, 例如:item.#?能夠匹配?item.insert.abc?或者?item.insert,item.* 只能匹配 item.insert
生產者:??
package com.wqg.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ fileName:SimpleProducer * @ description:主題模式-生產者 * @ author:wqg * @ createTime:2023/7/12 16:26 */ public class TopicsProducer { public static void main(String[] args) throws Exception { //創(chuàng)建連接工廠對象并設置連接信息 -----獲取連接對象(指定rabbitMQ服務端的信息) ConnectionFactory connectionFactory = new ConnectionFactory(); //rabbitMQ服務端的地址 默認localhost connectionFactory.setHost("192.168.75.129"); //設置端口號 connectionFactory.setPort(5672); //設置賬號 默認guest connectionFactory.setUsername("guest"); //設置密碼 默認guest connectionFactory.setPassword("guest"); //設置虛擬主機 默認/ connectionFactory.setVirtualHost("/"); //獲取連接對象 Connection connection = connectionFactory.newConnection(); //獲取Channel信道對象 Channel channel = connection.createChannel(); //創(chuàng)建交換機 channel.exchangeDeclare("Topics-exchange", BuiltinExchangeType.TOPIC,true); //創(chuàng)建隊列 channel.queueDeclare("Topics-queue001",true,false,false,null); channel.queueDeclare("Topics-queue002",true,false,false,null); //隊列和交換機綁定 channel.queueBind("Topics-queue001","Topics-exchange","*.orange.*"); channel.queueBind("Topics-queue002","Topics-exchange","*.*.rabbit"); channel.queueBind("Topics-queue002","Topics-exchange","lazy.#"); String msg = "Hello RabbitMQ~~~主題模式"; channel.basicPublish("Topics-exchange","lazy.rabbit.orange",null,msg.getBytes()); channel.close(); connectionFactory.clone(); } }
消費者:
package com.wqg.consumer; import com.rabbitmq.client.*; import java.io.IOException; /** * @ fileName:SimpleConsumer * @ description:路由模式-消費者 * @ author:WQG * @ createTime:2023/7/12 16:25 */ public class TopicsConsumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.75.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //channel.queueDeclare("Work-queue", true, false, false, null); //接受隊列中的信息 Consumer consumer = new DefaultConsumer(channel) { /** * * @param consumerTag: 消費者的標簽 * @param envelope : 設置 拿到你的交換機 路由key等信息 * @param properties: 消息的屬性對象 * @param body: 消息的內容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接受的內容===" + new String(body)); } }; /** * String queue, 隊列名 * boolean autoAck,是否自動確認。 當rabbitmq把消息發(fā)送給消費后,消費端自動確認消息。 * Consumer callback:回調。 當rabbitmq隊列中存在消息 則觸發(fā)該回調 */ channel.basicConsume("Topics-queue002", true,consumer); } }
Topic?主題模式可以實現(xiàn)?Pub/Sub?發(fā)布與訂閱模式和?Routing?路由模式的功能,
只是 Topic?在配置routing key?的時候可以使用通配符,顯得更加靈活。
4.springboot整合RabbitMQ
4.1.生產方
創(chuàng)建springboot項目----生產者(1) 引入 rabbitmq 整合的依賴<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
(2) 再配置文件中添加 rabbit 服務的信息#rabbitmq的配置 spring.rabbitmq.host=192.168.75.129 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
(3) 調用 RabbitTemplate 中發(fā)送消息的方法@SpringBootTest class RabbitmqSpringbootApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { rabbitTemplate.convertAndSend("Topics-exchange","lazy.aaa","Hello RabbitMQ..."); } }
4.2.消費方
創(chuàng)建springboot項目----消費者
(1)引入rabbitmq整合的依賴------同上
(2)再配置文件中添加rabbit服務的信息------同上文章來源:http://www.zghlxwxcb.cn/news/detail-556145.html
(3) 創(chuàng)建類 --- 創(chuàng)建監(jiān)聽方法即可 @RabbitListener@Component public class MyListener { //queues:表示你監(jiān)聽的隊列名 @RabbitListener(queues = {"Topics-queue002"}) public void h(Message message){ //把監(jiān)聽到的消息封裝到Message類對象中 byte[] body = message.getBody(); System.out.println("消息內容==="+new String(body)); } }
文章來源地址http://www.zghlxwxcb.cn/news/detail-556145.html
4.3.通過代碼創(chuàng)建交換機和隊列
package com.wqg.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @ fileName:RabbitConfig * @ description: * @ author:wqg * @ createTime:2023/7/12 19:41 */ @Configuration public class RabbitConfig { //定義了一個名為EXCHANGE_NAME的常量,用于表示交換器的名稱 public static final String EXCHANGE_NAME = "Topics-queue002"; @Bean public Exchange exchange() { // 創(chuàng)建一個Topic類型的Exchange,名稱為EXCHANGE_NAME,持久化 Exchange topic_exchange02 = ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); return topic_exchange02; } @Bean public Queue queue() { // 創(chuàng)建一個持久化的隊列,名稱為"Topics-queue003" Queue topic_queue03 = QueueBuilder.durable("Topics-queue003").build(); return topic_queue03; } @Bean public Binding binding() { // 創(chuàng)建一個綁定關系,將隊列綁定到Exchange上,并指定routing key為"qy165.#",不使用任何參數(shù) Binding noargs = BindingBuilder.bind(queue()).to(exchange()).with("qy165.#").noargs(); return noargs; } //如果交換機要綁定多個隊列 需要再寫一個bind方法 }
到了這里,關于RabbitMQ常用工作模式+整合springboot的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!