★ 發(fā)送消息
- Spring Boot可以將AmqpAdmin和AmqpTemplate注入任何其他組件,
接下來該組件即可通過AmqpAdmin來管理Exchange、隊列和綁定,還可通過AmqpTemplate來發(fā)送消息。
- Spring Boot還會自動配置一個RabbitMessagingTemplate Bean(RabbitAutoConfiguration負(fù)責(zé)配置),
如果想使用它來發(fā)送、接收消息,
可使用RabbitMessagingTemplate代替上面的AmqpTemplate,兩個Template的注入方式完全相同。
★ 創(chuàng)建隊列的兩種方式
方式一(編程式):在程序中通過AmqpAdmin創(chuàng)建隊列。
方式二(配置式):在容器中配置 org.springframework.amqp.core.Queue 類型的Bean,
RabbitMQ將會自動為該Bean創(chuàng)建對應(yīng)的隊列。
代碼演示
需求1:發(fā)送消息
1、ContentUtil 先定義常量
2、RabbitMQConfig 創(chuàng)建隊列的兩種方式之一:
配置式:
在容器中配置 org.springframework.amqp.core.Queue 類型的Bean,RabbitMQ將會自動為該Bean創(chuàng)建對應(yīng)的隊列。
就是在配置類中創(chuàng)建一個生成消息隊列的@Bean。
問題:
用 @Configuration 注解聲明為配置類,但是項目啟動的時候沒有自動生成這個隊列。
據(jù)了解是因為RabbitMQ使用了懶加載,大概是沒有消費者監(jiān)聽這個隊列,就沒有創(chuàng)建。
但是當(dāng)我寫后面的代碼后,這個消息隊列就生成了,但是并沒有消費者去監(jiān)聽這個隊列。
這有點想不通。
不知道后面是哪里的代碼讓這個配置類能成功聲明這個消息隊列出來。
水落石出:
經(jīng)過測試:
在下面的MessageService 這個類中,依賴注入了 AmqpAdmin 和 AmqpTemplate 這兩個對象,當(dāng)我們通過這兩個對象去聲明隊列、Exchange 和綁定的時候,配置類中的創(chuàng)建消息隊列的bean就能成功創(chuàng)建隊列。
這張圖結(jié)合下面的 MessageService 中的代碼就可得知:
這是依賴注入 AmqpAdmin 和 AmqpTemplate 這兩個對象的有參構(gòu)造器中聲明的。
3、MessageService 編寫邏輯
聲明Exchange 、 消息隊列 、 Exchange和消息隊列的綁定、發(fā)送消息的方法等
PublishController 控制器
application.properties 配置屬性
測試:消息發(fā)送
成功生成隊列
發(fā)送消息測試
★ 接收消息
@RabbitListener 注解修飾的方法將被注冊為消息監(jiān)聽器方法。
【備注】:該注解可通過queues屬性指定它要監(jiān)聽的、已有的消息隊列,
它也可使用queuesToDeclare來聲明隊列,并監(jiān)聽該隊列。
- 如果沒有顯式配置監(jiān)聽器容器工廠(RabbitListenerContainerFactory),
Spring Boot會在容器中自動配置一個SimpleRabbitListenerContainerFactory Bean作為監(jiān)聽器容器工廠,
如果希望使用DirectRabbitListenerContainerFactory,可在application.properties文件中添加如下配置:
spring.rabbitmq.listener.type=direct
▲ 如果在容器中配置了MessageRecoverer或MessageConverter,
它們會被自動關(guān)聯(lián)到默認(rèn)的監(jiān)聽器容器工廠。
代碼演示:
創(chuàng)建個消息隊列的監(jiān)聽器就可以了。
@RabbitListener 注解修飾的方法將被注冊為消息監(jiān)聽器方法。
該注解可通過queues屬性指定它要監(jiān)聽的、已有的消息隊列,
它也可使用queuesToDeclare來聲明隊列,并監(jiān)聽該隊列,
還可以用 bindings 進行 Exchange和queue的綁定操作。
測試: 消息接收
發(fā)送消息和監(jiān)聽消息文章來源:http://www.zghlxwxcb.cn/news/detail-735659.html
★ 定制監(jiān)聽器容器工廠
▲ 如果要定義更多的監(jiān)聽器容器工廠或覆蓋默認(rèn)的監(jiān)聽器容器工廠,
可通過Spring Boot提供的SimpleRabbitListenerContainerFactoryConfigurer
或DirectRabbitListenerContainerFactoryConfigurer來實現(xiàn),
它們可對SimpleRabbitListenerContainerFactory
或DirectRabbitListenerContainerFactory進行與自動配置相同的設(shè)置。
▲ 有了自定義的監(jiān)聽器容器工廠之后,可通過@RabbitListener注解的containerFactory屬性
來指定使用自定義的監(jiān)聽器容器工廠,
例如如下注解代碼:
@RabbitListener(queues = "myQueue1", containerFactory="myFactory")
完整代碼:
application.properties RabbitMQ的連接等屬性配置
# 配置連接 RabbitMQ 的基本信息------------------------------------------------------
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 下面屬性可配置多個以逗號隔開的連接地址,一旦配置了該屬性,host 和 port 屬性就會被忽略
# spring.rabbitmq.addresses=
spring.rabbitmq.username=ljh
spring.rabbitmq.password=123456
# 連接虛擬主機
spring.rabbitmq.virtual-host=my-vhost01
# 配置RabbitMQ的緩存相關(guān)信息--------------------------------------------------------
# 指定緩存 connection ,還是緩存 channel
spring.rabbitmq.cache.connection.mode=channel
# 指定可以緩存多少個 Channel
spring.rabbitmq.cache.channel.size=50
# 如果選擇的緩存模式是 connection , 那么就可以配置如下屬性
# spring.rabbitmq.cache.connection.size=15
# 配置 和 RabbitTemplate 相關(guān)的屬性--------------------------------------------------
# 指定 RabbitTemplate 發(fā)送消息失敗時會重新嘗試
spring.rabbitmq.template.retry.enabled=true
# RabbitTemplate 發(fā)送消息失敗后每隔1秒重新嘗試發(fā)送消息
spring.rabbitmq.template.retry.initial-interval=1s
# RabbitTemplate 發(fā)送消息失敗時,最多嘗試重新發(fā)送消息的次數(shù)
spring.rabbitmq.template.retry.max-attempts=5
# 設(shè)置每次嘗試重新發(fā)送消息的時間間隔是一個等比數(shù)列:1s, 2s, 4s, 8s, 16s
# 第一次等1s后嘗試,第二次等2s后嘗試,第三次等4s后嘗試重新發(fā)送消息......
spring.rabbitmq.template.retry.multiplier=2
# 指定發(fā)送消息時默認(rèn)的Exchange名
spring.rabbitmq.template.exchange=""
# 指定發(fā)送消息時默認(rèn)的路由key
spring.rabbitmq.template.routing-key="test"
# 配置和消息監(jiān)聽器的容器工廠相關(guān)的屬性--------------------------------------------------
# 指定監(jiān)聽器容器工廠的類型
spring.rabbitmq.listener.type=simple
# 指定消息的確認(rèn)模式
spring.rabbitmq.listener.simple.acknowledge-mode=auto
# 指定獲取消息失敗時,是否重試
spring.rabbitmq.listener.simple.retry.enabled=true
# 發(fā)送消息失敗時,最多嘗試重新發(fā)送消息的次數(shù)
spring.rabbitmq.listener.simple.retry.max-attempts=2
# 發(fā)送消息失敗后每隔1秒重新嘗試發(fā)送消息
spring.rabbitmq.listener.simple.retry.initial-interval=1s
ContentUtil 常量工具類
package cn.ljh.app.rabbitmq.util;
//常量
public class ContentUtil
{
//定義Exchange的常量-----fanout:扇形,就是廣播類型
public static final String EXCHANGE_NAME = "boot.fanout";
//消息隊列數(shù)組
public static final String[] QUEUE_NAMES =new String[] {"queue_boot_01","queue_boot_02","queue_boot_03"};
}
RabbitMQConfig 配置式創(chuàng)建消息隊列
package cn.ljh.app.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//配置式:在容器中配置 org.springframework.amqp.core.Queue 類型的Bean,RabbitMQ將會自動為該Bean創(chuàng)建對應(yīng)的隊列
//聲明這個類為配置類
@Configuration
public class RabbitMQConfig
{
//用配置式的方式在RabbitMQ中定義隊列
@Bean
public Queue myQueue()
{
//在容器中配置一個 Queue Bean,Spring 就會為它在 RabbitMQ 中自動創(chuàng)建對應(yīng)的 Queue
return new Queue("queue_boot", /* Queue 消息隊列名 */
true, /* 是否是持久的消息隊列 */
false, /* 是否是獨占的消息隊列,獨占就是是否只允許該消息消費者消費該隊列的消息 */
false, /* 是否在沒有消息的時候自動刪除消息隊列 */
null /* 額外的一些消息隊列的參數(shù) */
);
}
}
MessageService 發(fā)送消息的業(yè)務(wù)代碼
聲明Exchange 、Queue ,Exchange 綁定Queue,發(fā)送消息代碼文章來源地址http://www.zghlxwxcb.cn/news/detail-735659.html
package cn.ljh.app.rabbitmq.service;
import cn.ljh.app.rabbitmq.util.ContentUtil;
import org.springframework.amqp.core.*;
import org.springframework.stereotype.Service;
//業(yè)務(wù)邏輯:聲明Exchange 和 Queue 消息隊列,發(fā)送消息的方法
@Service
public class MessageService
{
//AmqpAdmin來管理Exchange、隊列和綁定
private final AmqpAdmin amqpAdmin;
//AmqpTemplate來發(fā)送消息
private final AmqpTemplate amqpTemplate;
//通過有參構(gòu)造器進行依賴注入
public MessageService(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate)
{
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
//由于聲明 Exchange 、 隊列 、 綁定(Exchange綁定隊列),都只需要做一次即可,因此放在此處構(gòu)造器中完成即可
//創(chuàng)建 fanout 類型的 Exchange ,使用FanoutExchange實現(xiàn)類
FanoutExchange exchange = new FanoutExchange(
ContentUtil.EXCHANGE_NAME,
true, /* Exchange是否持久化 */
false, /* 是否自動刪除 */
null /* 額外的參數(shù)屬性 */
);
//聲明 Exchange
this.amqpAdmin.declareExchange(exchange);
//此處循環(huán)聲明 Queue ,也相當(dāng)于代碼式創(chuàng)建 Queue
for (String queueName : ContentUtil.QUEUE_NAMES)
{
Queue queue = new Queue(queueName, /* Queue 消息隊列名 */
true, /* 是否是持久的消息隊列 */
false, /* 是否是獨占的消息隊列,獨占就是是否只允許該消息消費者消費該隊列的消息 */
false, /* 是否在沒有消息的時候自動刪除消息隊列 */
null /* 額外的一些消息隊列的參數(shù) */
);
//此處聲明 Queue ,也相當(dāng)于【代碼式】創(chuàng)建 Queue
this.amqpAdmin.declareQueue(queue);
//聲明 Queue 的綁定
Binding binding = new Binding(
queueName, /* 指定要分發(fā)消息目的地的名稱--這里是要發(fā)送到這個消息隊列里面去 */
Binding.DestinationType.QUEUE, /* 分發(fā)消息目的的類型,指定要綁定 queue 還是 Exchange */
ContentUtil.EXCHANGE_NAME, /* 要綁定的Exchange */
"x", /* 因為綁定的Exchange類型是 fanout 扇形(廣播)模式,所以路由key隨便寫,沒啥作用 */
null
);
//聲明 Queue 的綁定
amqpAdmin.declareBinding(binding);
}
}
//發(fā)送消息的方法
public void publish(String content)
{
//發(fā)送消息
amqpTemplate.convertAndSend(
ContentUtil.EXCHANGE_NAME, /* 指定將消息發(fā)送到這個Exchange */
"", /* 因為Exchange是fanout 類型的(廣播類型),所以寫什么路由key都行,都沒意義 */
content /* 發(fā)送的消息體 */
);
}
}
PublishController.java 發(fā)送消息的控制層
package cn.ljh.app.rabbitmq.controller;
import cn.ljh.app.rabbitmq.service.MessageService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
//發(fā)送消息
@RestController
public class PublishController
{
private final MessageService messageService;
//有參構(gòu)造器進行依賴注入
public PublishController(MessageService messageService)
{
this.messageService = messageService;
}
@GetMapping("/publish/{message}")
//因為{message}是一個路徑參數(shù),所以方法接收的時候需要加上注解 @PathVariable
public String publish(@PathVariable String message)
{
//發(fā)布消息
messageService.publish(message);
return "消息發(fā)布成功";
}
}
MyRabbitMQListener 監(jiān)聽器,監(jiān)聽消息隊列
package cn.ljh.app.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//監(jiān)聽器:監(jiān)聽消息隊列并進行消費
@Component
public class MyRabbitMQListener
{
//queues 指定監(jiān)聽已有的哪個消費隊列
@RabbitListener(queues = "queue_boot_01")
public void onQ1Message(String message)
{
System.err.println("從 queue_boot_01 消息隊列接收到的消息:" + message);
}
//queues 指定監(jiān)聽已有的哪個消費隊列
@RabbitListener(queues = "queue_boot_02")
public void onQ2Message(String message)
{
System.err.println("從 queue_boot_02 消息隊列接收到的消息:" + message);
}
//queues 指定監(jiān)聽已有的哪個消費隊列
//還可以用 queuesToDeclare 直接聲明并監(jiān)聽該隊列,還可以用 bindings 進行Exchange和queue的綁定
@RabbitListener(queuesToDeclare = @Queue(name = "queue_boot_03"
,durable = "true"
,exclusive = "false"
,autoDelete = "false"),
admin = "amqpAdmin" /*指定聲明Queue,綁定Queue所用的 amqpAdmin,不指定的話就用容器中默認(rèn)的那一個 */
)
public void onQ3Message(String message)
{
System.err.println("從 queue_boot_03 消息隊列接收到的消息:" + message);
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
</parent>
<groupId>cn.ljh</groupId>
<artifactId>rabbitmq_boot</artifactId>
<version>1.0.0</version>
<name>rabbitmq_boot</name>
<properties>
<java.version>11</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- RabbitMQ 的依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- web 依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 開發(fā)者工具的依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<!-- lombok 依賴-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
到了這里,關(guān)于207、SpringBoot 整合 RabbitMQ 實現(xiàn)消息的發(fā)送 與 接收(監(jiān)聽器)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!