書接上文,展示一下五種模型我使用的是spring could 微服務(wù)的框架
文章說明:
? ? ? ? 本文章我會(huì)分享總結(jié)5種實(shí)用的rabbitMQ的實(shí)用模型
1、hello world簡單模型
2、work queues工作隊(duì)列
3、Publish/Subscribe發(fā)布訂閱模型
4、Routing路由模型
5、Topics主題模型
(贈(zèng)送)6、消息轉(zhuǎn)換器
開局重要介紹(一定一定要知道的)
RabbitTemplate的主要作用是用來簡化與RabbitMQ消息代理之間的通信過程。RabbitMQ是一種類似于消息隊(duì)列的消息代理系統(tǒng),可以實(shí)現(xiàn)應(yīng)用程序之間的異步通信。
使用RabbitTemplate,我們可以通過其提供的方法直接向RabbitMQ發(fā)送消息,而無需編寫其他低層級(jí)的代碼。這樣可以減少開發(fā)人員的工作量,同時(shí)提高代碼的可讀性和可維護(hù)性。
具體來說,RabbitTemplate提供了以下幾種類型的方法:
- 發(fā)送簡單的消息
- 發(fā)送帶有附加信息的消息
- 發(fā)送帶有事務(wù)支持的消息
- 發(fā)送響應(yīng)式消息
通過這些方法,我們可以更方便地與RabbitMQ進(jìn)行交互,同時(shí)也可以更靈活地使用RabbitMQ進(jìn)行消息通信。因此,RabbitTemplate是Spring框架中非常重要的一個(gè)組件,也為開發(fā)人員提供了很多便利。
使用方法很簡單注入就完事了,這個(gè)是自帶的
@Autowired
RabbitTemplate rabbitTemplate;
經(jīng)常使用的方法
rabbitTemplate.convertAndSend()
給大伙解釋一下:
rabbitTemplate.convertAndSend() 是 RabbitTemplate 類中的一個(gè)方法,它可以將一個(gè) Java 對(duì)象轉(zhuǎn)換為 RabbitMQ 可以接受的消息格式,并將其發(fā)送給指定的消息隊(duì)列或交換機(jī)。
通常情況下,我們會(huì)將一個(gè) Java對(duì)象轉(zhuǎn)換為一個(gè) JSON 字符串,然后將該字符串作為消息發(fā)送給 RabbitMQ。在這個(gè)過程中,RabbitTemplate 會(huì)自動(dòng)將 JSON 字符串加上一些消息頭,以便 RabbitMQ 能夠正確地理解和解析這個(gè)消息。
convertAndSend() 方法可以接受多個(gè)參數(shù),用于指定消息的目的地、消息內(nèi)容和其他一些選項(xiàng)。例如,您可以指定消息應(yīng)該發(fā)送到哪個(gè)隊(duì)列(或者交換機(jī)),或者是否應(yīng)該啟用事務(wù)處理等。
總的來說,convertAndSend() 方法是一個(gè)非常方便的工具,可以讓我們更方便地與 RabbitMQ 進(jìn)行交互,同時(shí)也可以更靈活地使用 RabbitMQ 進(jìn)行消息通信。
五種模型實(shí)例
springboot依賴配置
1、首先在pom文件中導(dǎo)入依賴
我這里直接給代碼給大家直接CV即可
<!-- amqp依賴,包含Rabbitmq-->
<dependency>
? ?<groupId>org.springframework.boot</groupId>
? ?<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
?2、配置你的yml配置文件
因?yàn)槲沂褂玫膎acos所以有兩種方法
(1)直接在文件中的配置文件中添加相應(yīng)的配置
?
?(2)在nacos線上注冊(cè)系統(tǒng)中添加配置
?
?
?運(yùn)行一下看能不能正常啟動(dòng)
(注意:發(fā)現(xiàn)如果配置文件寫錯(cuò)會(huì)直接連接失?。┧晕抑苯咏o大家提供代碼
spring:
rabbitmq:
host:自己的Ip
port: 5672
username: guest
password: guest
virtualHost: /
listener:
simple:
prefetch: 1 # 每次只能獲取一條,處理完成才能獲取下一條
acknowledge-mode: manual # 設(shè)置消費(fèi)端手動(dòng)ack確認(rèn)
retry:
enabled: true # 是否支持重試
publisher-confirm-type: correlated #確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)
publisher-returns: true #確認(rèn)消息已發(fā)送到隊(duì)列(Queue)
?溫馨提示一定是在bootstrap.yml或者在nacos中的yml格式下才能使用
?
已經(jīng)可以正常運(yùn)行了!
nacos注冊(cè)服務(wù)中心能正常看到
hello world簡單模型(使用簡單隊(duì)列)
一對(duì)一消費(fèi),只有一個(gè)消費(fèi)者能接收到
?消費(fèi)者
?消費(fèi)者代碼
@Component
public class SimpleConsumer {
@RabbitListener(queuesToDeclare = {@Queue("simple.queue")})// queuesToDeclare 自動(dòng)聲明隊(duì)列
public void simple(String message){
System.out.println("message"+message);
}
}
?生產(chǎn)者代碼
@RestController
public class TestController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/test/{massage}")
public void simpleTest(@PathVariable String massage){
rabbitTemplate.convertAndSend("simple.queue",massage);
}
}
其中的massage是要消費(fèi)的信息,正常要使用JSON的方法給要傳遞的信息變成JSON字符串來進(jìn)行如果需要對(duì)象的傳遞。根據(jù)自己的需要來進(jìn)行。
這樣一個(gè)簡單的模型就寫好了
要注意的是先使用
@RabbitListener(queuesToDeclare = {@Queue("simple.queue")})來指定隊(duì)列的名稱
讓我們來運(yùn)行一下看一下效果
?看到客戶端里面已經(jīng)有一個(gè)隊(duì)列
使用我路徑進(jìn)行傳參對(duì)我要消費(fèi)的信息進(jìn)行消費(fèi),測(cè)試一下
?
?沒有報(bào)錯(cuò)現(xiàn)在看看控制臺(tái)有沒有打印hello word
?ok已經(jīng)完成了基本的簡單隊(duì)列,看懂了嗎,快去試試吧!
work queues工作隊(duì)列
阿丹小解讀:
好幾個(gè)消費(fèi)者,你一個(gè)我一個(gè)分配消費(fèi)消息,有預(yù)取機(jī)制,默認(rèn)公平消費(fèi),可配置能者多勞模式,誰完成的快,誰多做一點(diǎn)。我稱之為內(nèi)耗隊(duì)列。
?話不多說上代碼!
配置和依賴同上
關(guān)鍵配置(yml):
取消預(yù)取機(jī)制,能者多勞配置
spring:
? rabbitmq:
? ? host: 127.0.0.1
? ? port: 5672
? ? username: guest
? ? password: guest
? ? virtual-host: /
? ? listener:?
? ? ? simple:
? ? ? ? prefetch: 1 # 每次只能獲取一條,處理完成才能獲取下一條
還是一個(gè)消費(fèi)者,但是這個(gè)消費(fèi)者有兩個(gè),所以這兩消費(fèi)者都要指定監(jiān)聽同一個(gè)隊(duì)列
@Component
public class WorkQueues {
@RabbitListener(queuesToDeclare = @Queue("workQueue")) // queuesToDeclare 自動(dòng)聲明隊(duì)列
public void holloWordListener(String message) throws InterruptedException {
Thread.sleep(200);
System.out.println("message1 = " + message);
}
@RabbitListener(queuesToDeclare = @Queue("workQueue")) // queuesToDeclare 自動(dòng)聲明隊(duì)列
public void holloWordListener1(String message) throws InterruptedException {
Thread.sleep(400);
System.out.println("message2 = " + message);
}
}
這樣就完成了兩個(gè)消費(fèi)者
然后進(jìn)行一個(gè)生產(chǎn)者開始進(jìn)行生產(chǎn)需要處理的信息
@GetMapping("/testworkqueue")
public void testWorkQueue(){
String queueName = "workQueue";
String message = "hello,work.queue";
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(queueName,message+i);
System.out.println("i = " + i);
}
}
最后讓我們運(yùn)行開始查看效果
使用postman對(duì)我寫的方法進(jìn)行訪問
?
能看到有不同的消費(fèi)者消費(fèi)了不同的信息
?這就是work queues工作隊(duì)列了,如果你看明白了去試試吧!
Publish/Subscribe發(fā)布訂閱模型
阿丹解讀:
這個(gè)模式使用到了交換機(jī),真的泰酷辣!但是要注意exchange(交換機(jī))是不給緩存信息的
它會(huì)將同一個(gè)消息給多個(gè)消費(fèi)者
使用了fanout交換機(jī),會(huì)將接收到的信息路由給每一個(gè)綁定的queue隊(duì)列
它可以使用在的場(chǎng)景:這個(gè)業(yè)務(wù)需要異步發(fā)送短信也要發(fā)送郵件那么就可以通過這個(gè)來使用它
來吧上代碼!
還是消費(fèi)者
@Component
public class Publish {
// 不指定隊(duì)列,消息過了就沒了
//? @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})
// 指定隊(duì)列,可以接收緩存到隊(duì)列里的消息
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="test",durable = "true" ),
exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})
public void reveivel(String message){
System.out.println("message在我這里發(fā)送了短信 = " + message);
}
@RabbitListener(bindings = {@QueueBinding(value = @Queue,
exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})
public void reveivel2(String message){
System.out.println("message1在我這里發(fā)送了郵件 = " + message);
}
}
?給大家解釋解讀一下:
在這個(gè)例子中,@RabbitListener 注解被綁定到了一個(gè)名為 "test" 的隊(duì)列上,同時(shí)指定了該隊(duì)列使用 durable 模式(即持久化隊(duì)列)。
這個(gè)隊(duì)列綁定了一個(gè)名為 "fanoutTest" 的 fanout 類型的交換機(jī),通過 @QueueBinding 和 @Exchange 注解來指定。這個(gè)交換機(jī)分發(fā)消息到所有與之綁定的隊(duì)列中,而隊(duì)列則用來存儲(chǔ)這些消息。
當(dāng) RabbitMQ 服務(wù)接收到相關(guān)消息時(shí),會(huì)自動(dòng)觸發(fā)綁定了 @RabbitListener 注解的方法,即使該方法在應(yīng)用程序啟動(dòng)時(shí)并沒有手動(dòng)調(diào)用。
總的來說,@RabbitListener 注解和 @QueueBinding / @Exchange 注解的組合可以幫助開發(fā)者更方便地實(shí)現(xiàn) RabbitMQ 的監(jiān)聽功能,從而更加靈活地實(shí)現(xiàn)消息傳遞和處理的流程。
重點(diǎn)解讀:
"fanout" 是 RabbitMQ 中的一種交換機(jī)類型。在 RabbitMQ 中,消息可以被發(fā)送到交換機(jī),交換機(jī)再將消息分發(fā)到相關(guān)的隊(duì)列中。
"fanout" 類型的交換機(jī)會(huì)將它接收到的消息廣播給所有與之綁定的隊(duì)列,即所有隊(duì)列都會(huì)收到相同的消息。這種交換機(jī)類型適用于我們需要將消息傳遞給多個(gè)消費(fèi)者的場(chǎng)景。
除了 "fanout" 類型外,RabbitMQ 還支持多種其他類型的交換機(jī),例如:direct、topic、headers 等。不同類型的交換機(jī)有不同的消息路由規(guī)則,適用于不同的場(chǎng)景。開發(fā)者可以根據(jù)實(shí)際情況來選擇合適的交換機(jī)類型。
生產(chǎn)者:
@GetMapping("/testPushQueue")
public void tesyPubSubQueue(){
// 參數(shù)1:交換機(jī)名稱 ,
// 參數(shù)2 routingKey,(fanout類型可不寫) ,
// 參數(shù)3,消息內(nèi)容
rabbitTemplate.convertAndSend("fanoutTest","","阿丹的個(gè)人信息");
}
重點(diǎn)是這三個(gè)參數(shù)
運(yùn)行測(cè)試一下看看效果
老朋友postman?
效果實(shí)現(xiàn):
?
?總結(jié)一下使用這個(gè)模型的邏輯:
有些網(wǎng)站可能在用戶注冊(cè)或者登錄時(shí)要發(fā)送短信和郵件那么如果寫在一個(gè)里面的話。發(fā)送短信和發(fā)送郵件不是一個(gè)業(yè)務(wù)。所以要分開寫才對(duì)。
如果放在一起會(huì)導(dǎo)致運(yùn)行速度和處理速度被拖慢。
Routing路由模型
阿丹解讀:
routing模型也是將消息發(fā)送到交換機(jī)
但是使用的的類型和剛才的不一樣,這個(gè)使用的是direct(直接)類型的交換機(jī),它會(huì)將接到的消息按照規(guī)則路由到指定的queue隊(duì)列,所以是路由模式
上代碼!~
@Component
// 消費(fèi)者直接綁定交換機(jī),指定類型為direct,并指定key表示能消費(fèi)的key
public class Routing {
// 不指定隊(duì)列,消息過了就沒了
//? @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),key = {"info","error"})})
// 指定隊(duì)列,可以接收緩存到隊(duì)列里的消息
// key = {"info","error"} 表示我能接收到routingKey為 info和error的消息
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="test1",durable = "true" ),
exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),
key = {"info", "error"})})
public void receivel(String message){
System.out.println("message = " + message);
}
// key = {"error"} 表示我只能接收到routingKey為 error的消息
@RabbitListener(bindings = {@QueueBinding(value = @Queue,
exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),
key = {"error"})})
public void receivel1(String message){
System.out.println("message1 = " + message);
}
}
代碼解讀
@RabbitListener 注解綁定到了一個(gè)名為 "direstTest" 的直連類型的交換機(jī)上,同時(shí)還指定了消息被路由到 "error" 隊(duì)列中。
直連類型的交換機(jī)會(huì)根據(jù)消息的路由鍵(即 key 參數(shù))將消息路由到指定的隊(duì)列中。在這個(gè)例子中,當(dāng)消息的路由鍵包含 "error" 時(shí),消息就會(huì)被發(fā)送到 "error" 隊(duì)列中。
這個(gè) @RabbitListener 注解沒有指定隊(duì)列的持久化模式(即是否將隊(duì)列存儲(chǔ)到磁盤上),因此默認(rèn)使用的是非持久化隊(duì)列。
總的來說,通過 @QueueBinding 和 @Exchange 注解組合使用,@RabbitListener 注解可以指示 RabbitMQ 監(jiān)聽指定的消息隊(duì)列,并在收到相關(guān)消息時(shí)觸發(fā)相應(yīng)的方法。這個(gè)方法可以根據(jù)特定的業(yè)務(wù)邏輯來處理消息,從而實(shí)現(xiàn)不同的消息傳遞和處理流程。
生產(chǎn)者!
@GetMapping("/testRoutinginfo")
public void direstExchangeTest(){
rabbitTemplate.convertAndSend("direstTest","info","發(fā)送info的key的路由消息");
}
@GetMapping("/testRoutinginfo1")
// 路由模型
public void direstExchangeTest1(){
rabbitTemplate.convertAndSend("direstTest","error","發(fā)送error的key的路由消息");
}
要注意的就是
"error"就是指定路由了,如果使用的是fanout類型則可以不寫,但是這個(gè)模式下要寫本次使用的是
direst類型。
開始運(yùn)行測(cè)試!
?
?
?使用場(chǎng)景解讀:
在一個(gè)電商網(wǎng)站中,我們需要將用戶提交的訂單消息分類到不同的隊(duì)列中,以便我們可以在不同的處理隊(duì)列中執(zhí)行不同的業(yè)務(wù)流程。
Routing 模型的具體實(shí)現(xiàn)方式是將消息發(fā)送到一個(gè) direct 類型的交換機(jī)上,該交換機(jī)會(huì)根據(jù)消息的 routing key(即路由鍵)將消息路由到綁定了對(duì)應(yīng) routing key 的隊(duì)列中。通過這種方式,我們可以實(shí)現(xiàn)對(duì)不同類型的消息進(jìn)行區(qū)分,并在不同的消費(fèi)者之間進(jìn)行分流和負(fù)載均衡。
當(dāng)然,在實(shí)際應(yīng)用中,Routing 模型也有一些局限性,例如不能實(shí)現(xiàn)消息的完全隨機(jī)路由以及不能支持消息的匹配模式等。因此,在選取消息傳遞模型時(shí),開發(fā)人員需要結(jié)合具體場(chǎng)景來選擇合適的方案。
?這就是以上的路由模型了現(xiàn)在你自己可以去試試了!
Topics主題模型
阿丹解讀:
Topics 主題模型適用于一些需要實(shí)現(xiàn)高級(jí)的消息路由匹配功能的業(yè)務(wù)場(chǎng)景。例如,我們可能需要將不同種類的消息路由到不同的隊(duì)列中,同時(shí)還需要根據(jù)消息主題、標(biāo)簽或其他屬性來過濾和分類消息。
Topics 模型的具體實(shí)現(xiàn)方式是將消息發(fā)送到一個(gè) topic 類型的交換機(jī)上,該交換機(jī)會(huì)根據(jù)消息的 routing key(即路由鍵)進(jìn)行模糊匹配,并將符合條件的消息發(fā)送到對(duì)應(yīng)的隊(duì)列中。這種模糊匹配方式通常使用通配符("*" 和 "#")來實(shí)現(xiàn)。
例如,我們可以使用 ".error" 的路由鍵將所有錯(cuò)誤類型的消息發(fā)送到一個(gè) "error" 隊(duì)列中;或者使用 "user." 的路由鍵將所有與用戶相關(guān)的消息發(fā)送到一個(gè)名為 "user" 的隊(duì)列中。
在實(shí)際應(yīng)用中,Topics 模型通常用于一些消息特別多的系統(tǒng),例如新聞網(wǎng)站、電商平臺(tái)、社交媒體等,這些系統(tǒng)需要對(duì)不同類型的消息進(jìn)行靈活的分類和過濾,以便更好地實(shí)現(xiàn)業(yè)務(wù)邏輯。
使用重點(diǎn)
topicExchange與directExchange類型,區(qū)別在于routingKey必須是多個(gè)單詞的列表,并且以?.?分隔
*(代表通配符,任意一個(gè)字段) ?user.name ?user.* ?[user.age, ?user.xxx]
#(號(hào)代表一個(gè)或多個(gè)字段 ??user.name ?user.name.age)?
上代碼?。。。。。。?!
消費(fèi)者!
@Component
public class Topics {
// 不指定隊(duì)列,消息過了就沒了
//? @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = {"user.save","user.*"})})
// 指定隊(duì)列,可以接收緩存到隊(duì)列里的消息
// key = {"user.save","user.*"} 表示能消費(fèi) routingkey為? user.save 和 user.任意一個(gè)字符? 的消息
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="test2",durable = "true" ),exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = {"user.save","user.*"})})
public void recevicel(String message){
System.out.println("message = " + message);
}
// key = {"order.#","user.*"} 表示能消費(fèi) routingkey為? order.一個(gè)或多個(gè)字符? ?和? user.任意一個(gè)字符? 的消息
@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = {"order.#","user.*"})})
public void recevicel1(String message){
System.out.println("message1 = " + message);
}
}
解讀代碼:
@RabbitListener 注解綁定到了一個(gè)名為 "topicList" 的 topic 類型的交換機(jī)上,同時(shí)還指定了消息應(yīng)該路由到 "test2" 隊(duì)列中。
在這個(gè)例子中,交換機(jī)的名字為 "topicList",類型為 TOPIC。 key 參數(shù)指定了路由規(guī)則,它由兩部分組成:"user.save" 和 "user.*"。這個(gè)規(guī)則表示將所有路由鍵以 "user.save" 開頭的消息路由到 "test2" 隊(duì)列中,同時(shí)也會(huì)將所有路由鍵以 "user." 開頭的消息發(fā)送到 "test2" 隊(duì)列中。
這個(gè) @RabbitListener 注解還指定隊(duì)列的持久化模式為 true,即將隊(duì)列存儲(chǔ)到磁盤上,以便在 RabbitMQ 重啟后消息可以得到恢復(fù)。
總的來說,通過 @QueueBinding 和 @Exchange 注解組合使用,@RabbitListener 注解可以協(xié)助我們更加靈活地監(jiān)聽和處理 RabbitMQ 中的消息。在實(shí)際應(yīng)用中,我們可以根據(jù)具體的業(yè)務(wù)需求來定義不同的路由規(guī)則,以便更好地實(shí)現(xiàn)消息傳遞和處理的流程。
生產(chǎn)者!
@GetMapping("/topicTest")
public void topicTest(){
rabbitTemplate.convertAndSend("topicTest","user.save","topic路由消息,use.save");
}
@GetMapping("/topicTest1")
public void topicTest1(){
rabbitTemplate.convertAndSend("topicTest","order.select.getone","topic路由消息,order.select.getone");
}
運(yùn)行測(cè)試!
(有點(diǎn)小瑕疵后期補(bǔ)上)
消息轉(zhuǎn)換器(贈(zèng)送內(nèi)容)
代碼里直接發(fā)送對(duì)象,雖然接收的到消息,但是rabbitmq的界面上看到的消息會(huì)是亂碼?
文章來源:http://www.zghlxwxcb.cn/news/detail-755452.html
依賴
?<dependency>
? ? ?<groupId>com.fasterxml.jackson.dataformat</groupId>
? ? ?<artifactId>jackson-dataformat-xml</artifactId>
? ? ?<version>2.9.10</version>
?</dependency>
配置
@Configuration
public class RabbitmqConfig {
? // 消息轉(zhuǎn)換配置
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}
?再次發(fā)送就會(huì)是轉(zhuǎn)換好的消息文章來源地址http://www.zghlxwxcb.cn/news/detail-755452.html
到了這里,關(guān)于MQ消息隊(duì)列,以及RabbitMQ詳細(xì)(中1)五種rabbitMQ實(shí)用模型的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!