RabbitMQ官方文檔
RabbitMQ 提供了5種常用消息模型。但是其實(shí)3、4、5這三種都屬于訂閱模型,只不過進(jìn)行路由的方式不同。


1.簡單消息隊(duì)列模型
簡單消息隊(duì)列官方文檔
1、創(chuàng)建簡單消息隊(duì)列
2、導(dǎo)入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3、編寫生產(chǎn)者測試類SpringAmqpTest,并利用 RabbitTemplate 實(shí)現(xiàn)消息發(fā)送
@SpringBootTest
public class SpringAMQPTest {
//SpringAMQP提供的三個(gè)功能之一:封裝了 RabbitTemplate 工具,用于發(fā)送消息 (生產(chǎn)者)
@Autowired
private RabbitTemplate template;
@Test
void test(){
//利用RabbitTemplate提供的方法向指定隊(duì)列發(fā)送消息
template.convertAndSend("simple.queue","Hello SpringAQMP!");
}
}
4、編寫消費(fèi)者,監(jiān)聽隊(duì)列消息
@Component
public class MessageListener {
/**
* SpringAMQP提供的三個(gè)功能之二:基于注解的監(jiān)聽器模式,異步接收消息 (消費(fèi)者)
* @RabbitListener是SpringAMQP提供的監(jiān)聽消息隊(duì)列的注解
* @param message
*/
@RabbitListener(queues = "simple.queue")
public void basicQueueListener(String message) {
System.out.println("消費(fèi)者接收到消息:" + message);
}
}
5、編寫生產(chǎn)者和消費(fèi)者的application.yml文件
spring:
rabbitmq:
host: 192.168.6.131
port: 5672
# 虛擬主機(jī)
virtual-host: /
username: rabbitmq
password: 123456
6、測試
運(yùn)行生產(chǎn)者代碼后,simple.queue 隊(duì)列中產(chǎn)生了一條待消費(fèi)的消息。
運(yùn)行消費(fèi)者代碼后,隊(duì)列中的消息被消費(fèi)
2.Work工作隊(duì)列模型
work工作隊(duì)列官方文檔
1、 創(chuàng)建 work.queue 隊(duì)列
2、創(chuàng)建生產(chǎn)者測試類
@Test
void testWorkQueue(){
String queueName = "work.queue";
String message = "Hello SpringAQMP-";
for (int i = 0; i < 20; i++) {
template.convertAndSend(queueName,message+i);
}
}
3、創(chuàng)建消費(fèi)者測試類
@RabbitListener(queues = "work.queue")
public void workQueueListener01(String message) throws InterruptedException {
System.out.println("消費(fèi)者01接收到消息:" + message + " - " + LocalTime.now());
Thread.sleep(20);
}
@SneakyThrows
@RabbitListener(queues = "work.queue")
public void workQueueListener02(String message){
System.out.println("消費(fèi)者02接收到消息:" + message + " - " + LocalTime.now());
4、修改消費(fèi)者的application.yml
正常情況下每個(gè)消費(fèi)者要消費(fèi)的消息數(shù)量是一樣的。消息是平均分配給每個(gè)消費(fèi)者,并沒有考慮到消費(fèi)者的處理能力。這樣顯然不合理,因此修改消費(fèi)者的配置文件使得每個(gè)消費(fèi)者按照能力順序處理信息
spring:
rabbitmq:
listener:
simple: # 簡單隊(duì)列和work隊(duì)列的配置
prefetch: 1 # 每次只能獲取一條消息,處理完成才獲取下一條消息
5、測試
運(yùn)行生產(chǎn)者和消費(fèi)者類,兩個(gè)消費(fèi)者收到的同一個(gè)隊(duì)列中的消息肯定是不一樣的。
3.發(fā)布訂閱模型
在發(fā)布訂閱模型中,多了一個(gè)exchange角色,而且過程略有變化:
-
producer:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊(duì)列中,而是發(fā)給X(交換機(jī))
-
exchange:交換機(jī),一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于exchange的類型。exchange 有以下3種類型:
- fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列
- direct:定向,把消息交給符合指定 routing key 的隊(duì)列
- topic:通配符,把消息交給符合 routing pattern 的隊(duì)列
-
consumer:消費(fèi)者,訂閱隊(duì)列
-
queue:消息隊(duì)列,接收消息、緩存消息。
exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒有任何隊(duì)列與 exchange 綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)被丟棄!
3.1.Fanout廣播
Fanout廣播官方文檔
在廣播模式下,消息發(fā)送流程是這樣的:
-
可以有多個(gè)消費(fèi)者
-
每個(gè)消費(fèi)者有自己的queue(隊(duì)列)
-
每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))
-
生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來決定要發(fā)給哪個(gè)隊(duì)列,生產(chǎn)者無法決定。
-
交換機(jī)把消息發(fā)送給綁定過的所有隊(duì)列
-
隊(duì)列的消費(fèi)者都能拿到消息。實(shí)現(xiàn)一條消息被多個(gè)消費(fèi)者消費(fèi)
-
使用內(nèi)置的 amq.fanout 交換機(jī)
-
兩個(gè)隊(duì)列不用手動(dòng)創(chuàng)建,是在下面
1、創(chuàng)建生產(chǎn)者測試類
@Test
void testFanoutExchange() {
rabbitTemplate.convertAndSend("amq.fanout", "", "Hello amq.fanout");
}
2、創(chuàng)建消費(fèi)者測試類
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.queue01"),
exchange = @Exchange(name = "amq.fanout", type = ExchangeTypes.FANOUT)
))
public void fanoutQueueListener01(String message) {
System.out.println("消費(fèi)者01接收到fanout.queue01的消息:" + message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.queue02"),//使用@Queue(name = "fanout.queue01")RabbitMQ自動(dòng)生成隊(duì)列
exchange = @Exchange(name = "amq.fanout", type = ExchangeTypes.FANOUT)
))
public void fanoutQueueListener02(String message) {
System.out.println("消費(fèi)者02接收到fanout.queue02的消息:" + message);
}
}
3、測試
運(yùn)行生產(chǎn)者和消費(fèi)者
3.2.Direct路由
Direct路由官方文檔
在 Fanout 模式中,一條消息,會(huì)被所有訂閱的隊(duì)列都消費(fèi)。但是,在某些場景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到 direct 類型的 exchange。
Direct模型下:
- 隊(duì)列與交換機(jī)的綁定要指定一個(gè) RoutingKey
- 消息的發(fā)送方在向 exchange 發(fā)送消息時(shí),也必須指定消息的 RoutingKey。
- exchange 不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey 與消息的 Routing key 完全一致,才會(huì)接收到消息
1、創(chuàng)建生產(chǎn)者測試類
@Test
void testDirectExchange() {
rabbitTemplate.convertAndSend("amq.direct", "save", "新增通知");
rabbitTemplate.convertAndSend("amq.direct", "delete", "刪除通知");
}
2、創(chuàng)建消費(fèi)者測試類
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue01"),
exchange = @Exchange(name = "amq.direct", type = ExchangeTypes.DIRECT),
key = {"save"}
))
// 處理新增的業(yè)務(wù)
public void directQueueListener01(String message){
System.out.println("消費(fèi)者01接收到direct.queue01的消息:" + message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue02"),
exchange = @Exchange(name = "amq.direct", type = ExchangeTypes.DIRECT),
key = {"delete"}
))
// 處理刪除的業(yè)務(wù)
public void directQueueListener02(String message){
System.out.println("消費(fèi)者02接收到direct.queue02的消息:" + message);
}
3、測試
運(yùn)行生產(chǎn)者和消費(fèi)者測試類
3.3.Topics通配符
Topics通配符官方文檔
Topics 類型的 Exchange 與 Direct 相比,都是可以根據(jù) RoutingKey 把消息路由到不同的隊(duì)列。
不同的是 Topic類型的 Exchange 可以讓隊(duì)列在綁定 Routingkey 的時(shí)候使用通配符
通配符規(guī)則:
-
#:匹配一個(gè)或多個(gè)單詞
-
*:匹配一個(gè)單詞
舉例:
-
user.#:能夠匹配 user.add 或者 user.detail.add
-
user.*:只能匹配 user.add
1、創(chuàng)建生產(chǎn)者測試類
@Test
public void testTopicExchange() {
rabbitTemplate.convertAndSend("amq.topic", "user.add", "新增用戶通知");
rabbitTemplate.convertAndSend("amq.topic", "user.update", "更新戶通知");
rabbitTemplate.convertAndSend("amq.topic", "dept.add", "新增部門通知");
rabbitTemplate.convertAndSend("amq.topic", "dept.update", "更新部門通知");
}
2、創(chuàng)建消費(fèi)者測試類
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue01"),
exchange = @Exchange(name = "amq.topic", type = ExchangeTypes.TOPIC),
key = "user.*"
))
public void topicQueueListener01(String message){
System.out.println("消費(fèi)者01接收到topic.queue01的消息:" + message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue02"),
exchange = @Exchange(name = "amq.topic", type = ExchangeTypes.TOPIC),
key = "dept.*"
))
public void topicQueueListener02(String message){
System.out.println("消費(fèi)者02接收到topic.queue02的消息:" + message);
}
3、測試文章來源:http://www.zghlxwxcb.cn/news/detail-736175.html
運(yùn)行生產(chǎn)者和消費(fèi)者測試類文章來源地址http://www.zghlxwxcb.cn/news/detail-736175.html
到了這里,關(guān)于中間件_RabbitMQ五種消息模型的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!