提示:文章寫完后,目錄可以自動(dòng)生成,如何生成可參考右邊的幫助文檔
SpringAMQP
1.SpringBoot 的支持
- SpringBoot 已經(jīng)提供了對(duì) AMQP 協(xié)議完全支持的 spring-boot-starter-amqp 依賴,引入此依賴即可快速方便的在 SpringBoot 中使用 RabbitMQ。
https://spring.io/projects/spring-amqp
2.RabbitTemplate
- RabbitTemplate 是 SpringBoot AMQP 提供的快速發(fā) RabbitMQ 消息的模板類,與 RestTemplate 有類似之處,意指方便、簡(jiǎn)單、快速的發(fā) RabbitMQ 消息。
@Slf4j
@Component
public class ClientReportTopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String ROUTING_KEY = "report";
public void send(String param) {
rabbitTemplate.send(TopicConst.CLIENT_REPORT_TOPIC, ROUTING_KEY, new Message(param.getBytes(), new MessageProperties()));
}
}
send:將消息發(fā)送到指定的交換機(jī)和路由鍵中。
convertAndSend:將Java對(duì)象轉(zhuǎn)換為消息,然后將其發(fā)送到指定的交換機(jī)和路由鍵中。
sendAndReceive:發(fā)送一個(gè)請(qǐng)求消息并接收一個(gè)響應(yīng)消息。
convertSendAndReceive:將Java對(duì)象轉(zhuǎn)換為請(qǐng)求消息,發(fā)送請(qǐng)求消息,并接收響應(yīng)消息。
convertSendAndReceiveAsType:將Java對(duì)象轉(zhuǎn)換為請(qǐng)求消息,發(fā)送請(qǐng)求消息,并接收響應(yīng)消息,并將響應(yīng)消息轉(zhuǎn)換為指定類型的Java對(duì)象。
convertSendAndReceiveAsType:將Java對(duì)象轉(zhuǎn)換為請(qǐng)求消息,發(fā)送請(qǐng)求消息,并接收響應(yīng)消息,并將響應(yīng)消息轉(zhuǎn)換為指定類型的Java對(duì)象。
sendWithMessagePostProcessor:發(fā)送消息,并在發(fā)送之前進(jìn)行處理。
execute:執(zhí)行Rabbit操作并返回一個(gè)結(jié)果。
receive:從隊(duì)列接收一條消息。
receiveAndConvert:從隊(duì)列接收一條消息,并將其轉(zhuǎn)換為Java對(duì)象。
receiveAndReply:從隊(duì)列接收一條請(qǐng)求消息,并發(fā)送一個(gè)響應(yīng)消息。
convertSendAndReceiveAsType:將Java對(duì)象轉(zhuǎn)換為請(qǐng)求消息,發(fā)送請(qǐng)求消息,并接收響應(yīng)消息,并將響應(yīng)消息轉(zhuǎn)換為指定類型的Java對(duì)象。
convertSendAndReceiveAsType:將Java對(duì)象轉(zhuǎn)換為請(qǐng)求消息,發(fā)送請(qǐng)求消息,并接收響應(yīng)消息,并將響應(yīng)消息轉(zhuǎn)換為指定類型的Java對(duì)象。
convertSendAndReceiveAndReplyHeader:將Java對(duì)象轉(zhuǎn)換為請(qǐng)求消息,并發(fā)送請(qǐng)求消息。接收到請(qǐng)求消息后,將其轉(zhuǎn)換為響應(yīng)消息,并設(shè)置響應(yīng)消息的頭信息。
convertAndSend:將Java對(duì)象轉(zhuǎn)換為消息,并發(fā)送消息。
convertAndSend:將Java對(duì)象轉(zhuǎn)換為消息,并發(fā)送消息。在發(fā)送之前,先對(duì)消息進(jìn)行處理。
convertAndSend:將Java對(duì)象轉(zhuǎn)換為消息,并發(fā)送消息。在發(fā)送之前,先對(duì)消息進(jìn)行處理,并指定響應(yīng)消息的類型。
convertAndSend:將Java對(duì)象轉(zhuǎn)換為消息,并發(fā)送消息。在發(fā)送之前,先對(duì)消息進(jìn)行處理,并指定響應(yīng)消息的類型和交換機(jī)。
send:將消息發(fā)送到指定的交換機(jī)和路由鍵中。
send:將消息發(fā)送到指定的交換機(jī)和路由鍵中。在發(fā)送之前,先對(duì)消息進(jìn)行處理。
send:將消息發(fā)送到指定的交換機(jī)和路由鍵中。在發(fā)送之前,先對(duì)消息進(jìn)行處理,并指定響應(yīng)消息的類型。
sendAndReceive:發(fā)送一個(gè)請(qǐng)求消息并接收一個(gè)響應(yīng)消息。
sendAndReceive:發(fā)送一個(gè)請(qǐng)求消息并接收一個(gè)響應(yīng)消息。在發(fā)送之前,先對(duì)消息進(jìn)行處理。
sendAndReceive:發(fā)送一個(gè)請(qǐng)求消息并接收一個(gè)響應(yīng)消息。在發(fā)送之前,先對(duì)消息進(jìn)行處理,并指定響應(yīng)消息的類型。
sendAndReceive:發(fā)送一個(gè)請(qǐng)求消息并接收一個(gè)響應(yīng)消息。在發(fā)送之前,先對(duì)消息進(jìn)行處理,并指定響應(yīng)消息的類型和交換機(jī)。
setConnectionFactory:設(shè)置RabbitMQ連接工廠。
getConnectionFactory:獲取RabbitMQ連接工廠。
setExchange:設(shè)置默認(rèn)的交換機(jī)。
getExchange:獲取默認(rèn)的交換機(jī)。
setRoutingKey:設(shè)置默認(rèn)的路由鍵。
getRoutingKey:獲取默認(rèn)的路由鍵。
setQueue:設(shè)置默認(rèn)的隊(duì)列。
getQueue:獲取默認(rèn)的隊(duì)列。
setMandatory:設(shè)置消息是否強(qiáng)制路由到隊(duì)列。
isMandatory:檢查消息是否強(qiáng)制路由到隊(duì)列。
setReplyTimeout:設(shè)置接收響應(yīng)消息的超時(shí)時(shí)間。
getReplyTimeout:獲取接收響應(yīng)消息的超時(shí)時(shí)間。
setChannelTransacted:設(shè)置通道是否應(yīng)該在事務(wù)中使用。
isChannelTransacted:檢查通道是否應(yīng)該在事務(wù)中使用。
setConfirmCallback:設(shè)置確認(rèn)回調(diào)。
getConfirmCallback:獲取確認(rèn)回調(diào)。
setReturnCallback:設(shè)置返回回調(diào)。
getReturnCallback:獲取返回回調(diào)。
setBeforePublishPostProcessor:設(shè)置發(fā)布之前的后處理器。
getBeforePublishPostProcessor:獲取發(fā)布之前的后處理器。
setAfterReceivePostProcessor:設(shè)置接收后的后處理器。
getAfterReceivePostProcessor:獲取接收后的后處理器。
setUsePublisherConnection:設(shè)置是否應(yīng)該使用發(fā)布者連接。
isUsePublisherConnection:檢查是否應(yīng)該使用發(fā)布者連接。
setApplicationContext:設(shè)置應(yīng)用程序上下文。
3.@RabbitListener(終極監(jiān)聽(tīng)方案)
使用此方案做監(jiān)聽(tīng)消息功能,就可以把之前的 SimpleMessageListenerContainer 進(jìn)行監(jiān)聽(tīng)的方案舍棄掉了,就是這么的喜新厭舊,不過(guò)之前的 SimpleMessageListenerContainer 也不是一無(wú)是處,學(xué)過(guò)之后可以更好的理解內(nèi)部的一些邏輯。
@RabbitListener 的特點(diǎn):
- RabbitListener 是 SpringBoot 架構(gòu)中監(jiān)聽(tīng)消息的終極方案。
- RabbitListener 使用注解聲明,對(duì)業(yè)務(wù)代碼無(wú)侵入。
- RabbitListener 可以在 SpringBoot 配置文件中進(jìn)行配置。
@RabbitListener 本身是 Java 中的注解,可以搭配其他注解一起使用:
- @Exchange:自動(dòng)聲明 Exchange。
- @Queue:自動(dòng)聲明隊(duì)列。
- @QueueBinding:自動(dòng)聲明綁定關(guān)系。
package com.rabbitmqdemoconsumer.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitLeistener {
@RabbitListener(queues = "MqTest1")
public void listenSimpleQueueMessage1(String msg){
System.out.println("consume1接收到的消息:"+msg);
}
@RabbitListener(queues = "MqTest1")
public void listenSimpleQueueMessage2(String msg){
System.out.println("consume2接收到的消息:"+msg);
}
}
4.RabbitConfig—rabbitmq配置類
聲明式實(shí)現(xiàn)(推薦)
@Slf4j
@Configuration
public class RabbitConfig {
public static final String EXCHANGE_NAME = "exchange.cat.dog";
public static final String EXCHANGE_DLX = "exchange.dlx";
public static final String QUEUE_NAME = "queue.cat";
public static final String QUEUE_DLX = "queue.dlx";
public static final String KEY_NAME = "key.yingduan";
public static final String KEY_DLX = "#";
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
return connectionFactory;
}
@Bean
RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
Exchange exchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
Queue queue() {
return new Queue(QUEUE_NAME);
}
@Bean
Binding binding() {
// 目的地名稱、目的地類型、綁定交換機(jī)、綁定 key、參數(shù)
return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME, null);
}
//死信隊(duì)列機(jī)制 死信隊(duì)列需要在創(chuàng)建 Queue 時(shí)指定對(duì)應(yīng)屬性:
@Bean
Queue queue() {
// 配置聲明隊(duì)列時(shí)使用的參數(shù)
Map<String, Object> args = new HashMap<>(1);
// 設(shè)置死信隊(duì)列指向的交換機(jī)
args.put("x-dead-letter-exchange", EXCHANGE_DLX);
return new Queue(QUEUE_NAME, true, false, false, args);
}
}
注意,以上配置再啟動(dòng) SpringBoot 并不會(huì)立馬創(chuàng)建交換機(jī)、隊(duì)列、綁定,SpringBoot AMQP 有懶加載,需要等到使用 connection 時(shí)才會(huì)創(chuàng)建。什么是使用 connection 呢?
- 比如創(chuàng)建 connection
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
connectionFactory.createConnection();
return connectionFactory;
}
- 再比如監(jiān)聽(tīng)了隊(duì)列
@RabbitListener(queues = {"test"})
void test() {
log.info("【測(cè)試監(jiān)聽(tīng)消息】");
}
SpringBoot集成RabbitMQ 案例
配置
導(dǎo)入maven坐標(biāo)
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml配置
spring:
rabbitmq:
addresses: 192.168.100.120:5672,192.168.100.121:5672,192.168.100.122:5672
username: admin
password: admin
#開(kāi)啟消息確認(rèn)模式,新版本已經(jīng)棄用
#publisher-confirms: true
#開(kāi)啟消息送達(dá)提示
publisher-returns: true
# springboot.rabbitmq.publisher-confirm 新版本已被棄用,現(xiàn)在使用 spring.rabbitmq.publisher-confirm-type = correlated 實(shí)現(xiàn)相同效果
publisher-confirm-type: correlated
virtual-host: /
listener:
type: simple
simple:
acknowledge-mode: auto #確認(rèn)模式
prefetch: 1 #限制每次發(fā)送一條數(shù)據(jù)。
concurrency: 3 #同一個(gè)隊(duì)列啟動(dòng)幾個(gè)消費(fèi)者
max-concurrency: 3 #啟動(dòng)消費(fèi)者最大數(shù)量
#重試策略相關(guān)配置
retry:
# 開(kāi)啟消費(fèi)者(程序出現(xiàn)異常)重試機(jī)制,默認(rèn)開(kāi)啟并一直重試
enabled: true
# 最大重試次數(shù)
max-attempts: 5
# 重試間隔時(shí)間(毫秒)
initial-interval: 3000
RabbitMQ 參數(shù)配置說(shuō)明
spring:
rabbitmq:
host: 127.0.0.1 #ip
port: 5672 #端口
username: guest #賬號(hào)
password: guest #密碼
virtualHost: #鏈接的虛擬主機(jī)
addresses: 127.0.0.1:5672 #多個(gè)以逗號(hào)分隔,與host功能一樣。
requestedHeartbeat: 60 #指定心跳超時(shí),單位秒,0為不指定;默認(rèn)60s
publisherConfirms: true #發(fā)布確認(rèn)機(jī)制是否啟用
#確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)
#publisher-confirm-type參數(shù)有三個(gè)可選值:
#SIMPLE:會(huì)觸發(fā)回調(diào)方法,相當(dāng)于單個(gè)確認(rèn)(發(fā)一條確認(rèn)一條)。
#CORRELATED:消息從生產(chǎn)者發(fā)送到交換機(jī)后觸發(fā)回調(diào)方法。
#NONE(默認(rèn)):關(guān)閉發(fā)布確認(rèn)模式。
#publisher-confirm-type: correlated #發(fā)布確認(rèn)機(jī)制是否啟用 高版本Springboot使用替換掉publisher-confirms:true
publisherReturns: true #發(fā)布返回是否啟用
connectionTimeout: #鏈接超時(shí)。單位ms。0表示無(wú)窮大不超時(shí)
### ssl相關(guān)
ssl:
enabled: #是否支持ssl
keyStore: #指定持有SSL certificate的key store的路徑
keyStoreType: #key store類型 默認(rèn)PKCS12
keyStorePassword: #指定訪問(wèn)key store的密碼
trustStore: #指定持有SSL certificates的Trust store
trustStoreType: #默認(rèn)JKS
trustStorePassword: #訪問(wèn)密碼
algorithm: #ssl使用的算法,例如,TLSv1.1
verifyHostname: #是否開(kāi)啟hostname驗(yàn)證
### cache相關(guān)
cache:
channel:
size: #緩存中保持的channel數(shù)量
checkoutTimeout: #當(dāng)緩存數(shù)量被設(shè)置時(shí),從緩存中獲取一個(gè)channel的超時(shí)時(shí)間,單位毫秒;如果為0,則總是創(chuàng)建一個(gè)新channel
connection:
mode: #連接工廠緩存模式:CHANNEL 和 CONNECTION
size: #緩存的連接數(shù),只有是CONNECTION模式時(shí)生效
### listener
listener:
type: #兩種類型,SIMPLE,DIRECT
## simple類型
simple:
concurrency: #最小消費(fèi)者數(shù)量
maxConcurrency: #最大的消費(fèi)者數(shù)量
transactionSize: #指定一個(gè)事務(wù)處理的消息數(shù)量,最好是小于等于prefetch的數(shù)量
missingQueuesFatal: #是否停止容器當(dāng)容器中的隊(duì)列不可用
## 與direct相同配置部分
autoStartup: #是否自動(dòng)啟動(dòng)容器
acknowledgeMode: #表示消息確認(rèn)方式,其有三種配置方式,分別是none、manual和auto;默認(rèn)auto
prefetch: #指定一個(gè)請(qǐng)求能處理多少個(gè)消息,如果有事務(wù)的話,必須大于等于transaction數(shù)量
defaultRequeueRejected: #決定被拒絕的消息是否重新入隊(duì);默認(rèn)是true(與參數(shù)acknowledge-mode有關(guān)系)
idleEventInterval: #container events發(fā)布頻率,單位ms
##重試機(jī)制
retry:
stateless: #有無(wú)狀態(tài)
enabled: #是否開(kāi)啟
maxAttempts: #最大重試次數(shù),默認(rèn)3
initialInterval: #重試間隔
multiplier: #對(duì)于上一次重試的乘數(shù)
maxInterval: #最大重試時(shí)間間隔
direct:
consumersPerQueue: #每個(gè)隊(duì)列消費(fèi)者數(shù)量
missingQueuesFatal:
#...其余配置看上方公共配置
## template相關(guān)
template:
mandatory: #是否啟用強(qiáng)制信息;默認(rèn)false
receiveTimeout: #`receive()`接收方法超時(shí)時(shí)間
replyTimeout: #`sendAndReceive()`超時(shí)時(shí)間
exchange: #默認(rèn)的交換機(jī)
routingKey: #默認(rèn)的路由
defaultReceiveQueue: #默認(rèn)的接收隊(duì)列
## retry重試相關(guān)
retry:
enabled: #是否開(kāi)啟
maxAttempts: #最大重試次數(shù)
initialInterval: #重試間隔
multiplier: #失敗間隔乘數(shù)
maxInterval: #最大間隔
1.基本消息隊(duì)列
1、創(chuàng)建隊(duì)列
- 訪問(wèn)接口:http://localhost:15672,賬號(hào)密碼都為guest
2、發(fā)布消息
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
String queue="MqTest1";
String message="message1";
rabbitTemplate.convertAndSend(queue,message);
}
}
3、接受消息
package com.rabbitmqdemoconsumer.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitLeistener {
@RabbitListener(queues = "MqTest1")
public void listenSimpleQueueMessage(String msg){
System.out.println("接收到的消息:"+msg);
}
}
2.工作消息隊(duì)列(Work Queue)
- 可以提高消息處理速度,避免隊(duì)列消息堆積
1、發(fā)布消息
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
String queue="MqTest1";
String message="message1";
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend(queue,message);
}
}
}
2、接受消息
package com.rabbitmqdemoconsumer.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitLeistener {
@RabbitListener(queues = "MqTest1")
public void listenSimpleQueueMessage1(String msg){
System.out.println("consume1接收到的消息:"+msg);
}
@RabbitListener(queues = "MqTest1")
public void listenSimpleQueueMessage2(String msg){
System.out.println("consume2接收到的消息:"+msg);
}
}
3、控制臺(tái)輸出結(jié)果
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
4、消息預(yù)取問(wèn)題
- 但是此時(shí)有一個(gè)問(wèn)題就是消息預(yù)取,比如隊(duì)列有10條消息,兩個(gè)消費(fèi)者各自直接先預(yù)取5個(gè)消息,如果一個(gè)消費(fèi)者接受消息的速度慢,一個(gè)快,就會(huì)導(dǎo)致一個(gè)消費(fèi)者已經(jīng)完成工作,另一個(gè)還在慢慢處理,會(huì)造成消息堆積消費(fèi)者身上,要解決這個(gè)問(wèn)題需要在yml文件配置相關(guān)配置
rabbitmq:
host: 43.140.244.236
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
prefetch: 1 #每次只能取一個(gè),處理完才能取下一個(gè)消息
3.發(fā)布訂閱模式之模式(Fanout)
exchange是交換機(jī),負(fù)責(zé)消息路由,但不存儲(chǔ)消息,路由失敗則消息丟失
生產(chǎn)者將消息發(fā)送到fanout交換器
- fanout交換機(jī)非常簡(jiǎn)單。它只是將接收到的所有消息廣播給它所知道的所有隊(duì)列
1、Fanout配置類(@Bean聲明)
package com.rabbitmqdemoconsumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanountConfig {
//交換機(jī)聲明
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("FanountExchange");
}
//聲明隊(duì)列1
@Bean
public Queue Fanount_Qeueue1(){
return new Queue("Fanount_Qeueue1");
}
//聲明隊(duì)列2
@Bean
public Queue Fanount_Qeueue2(){
return new Queue("Fanount_Qeueue2");
}
//綁定交換機(jī)和隊(duì)列
@Bean
public Binding bindingFanount_Qeueue1(Queue Fanount_Qeueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(Fanount_Qeueue1).to(fanoutExchange);
}
@Bean
public Binding bindingFanount_Qeueue2(Queue Fanount_Qeueue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(Fanount_Qeueue2).to(fanoutExchange);
}
}
2、發(fā)送消息
首先發(fā)送10條消息,經(jīng)過(guò)交換機(jī)轉(zhuǎn)發(fā)到隊(duì)列
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads2() {
String exchange="FanountExchange";
String message="message";
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend(exchange,"",message);
}
}
}
3、接受消息
//監(jiān)聽(tīng)交換機(jī)Fanount_Qeueue1
@RabbitListener(queues = "Fanount_Qeueue1")
public void listenFanountQeueue1(String msg){
System.out.println("Fanount_Qeueue1接收到的消息:"+msg);
}
//監(jiān)聽(tīng)交換機(jī)Fanount_Qeueue2
@RabbitListener(queues = "Fanount_Qeueue2")
public void listenFanountQeueue2(String msg){
System.out.println("Fanount_Qeueue2接收到的消息:"+msg);
}
4.路由模式(Direct)
- 會(huì)將消息根據(jù)規(guī)則路由到指定的隊(duì)列
生產(chǎn)者將消息發(fā)送到direct交換器
1、聲明(基于@RabbitListener聲明)
package com.rabbitmqdemoconsumer.rabbitmq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitLeistener {
/**
* 綁定交換機(jī)和隊(duì)列,并為key賦值
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "DirectQueue1"),
exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("listenDirectQueue1接收到的消息:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "DirectQueue2"),
exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("listenDirectQueue2接收到的消息:"+msg);
}
}
2、發(fā)送給blue
發(fā)送消息
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads2() {
String exchange="DirectExchange";
String message="HelloWorld";
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend(exchange,"blue",message);
}
}
}
3、發(fā)送給red
發(fā)送消息
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads2() {
String exchange="DirectExchange";
String message="HelloWorld";
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend(exchange,"red",message);
}
}
}
5.主題模式(Topic)
生產(chǎn)者將消息發(fā)送到 topic交換器
Queue與Exchange指定BindingKey可以使用通配符:
#:代指0個(gè)或多個(gè)單詞
*:代指一個(gè)單詞
1、聲明
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "TopicQueue1"),
exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC),
key = {"china.#"}
))
public void listenTopicQueue1(String msg){
System.out.println("listenTopicQueue1接收到的消息:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "TopicQueue2"),
exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC),
key = {"#.news"}
))
public void listenTopicQueue2(String msg){
System.out.println("listenTopicQueue2接收到的消息:"+msg);
}
2、發(fā)送消息(測(cè)試1)
package com.rabbitmqdemo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads2() {
String exchange="TopicExchange";
String message="HelloWorld";
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend(exchange,"china.news",message);
}
}
}
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-841962.html
3、發(fā)送消息(測(cè)試2)
package com.rabbitmqdemo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads2() {
String exchange="TopicExchange";
String message="HelloWorld";
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend(exchange,"china.weather",message);
}
}
}
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-841962.html
到了這里,關(guān)于RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!