一、序言
在上一節(jié) RabbitMQ中的核心概念和交換機(jī)類型 中我們介紹了RabbitMQ中的一些核心概念,尤其是各種交換機(jī)的類型,接下來我們將具體講解各種交換機(jī)的配置和消息訂閱實(shí)操。
二、配置文件application.yml
我們先上應(yīng)用啟動配置文件application.yml
,如下:
server:
port: 8080
spring:
rabbitmq:
addresses: localhost:5672
username: admin
password: admin
virtual-host: /
listener:
type: simple
simple:
acknowledge-mode: auto
concurrency: 5
max-concurrency: 20
prefetch: 5
備注:這里我們指定了
RabbitListenerContainerFactory
的類型為SimpleRabbitListenerContainerFactory
,并且指定消息確認(rèn)模式為自動確認(rèn)。
三、RabbitMQ交換機(jī)和隊(duì)列配置
Spring官方提供了一套 流式API 來定義隊(duì)列、交換機(jī)和綁定關(guān)系,非常的方便,接下來我們定義4種類型的交換機(jī)和相應(yīng)隊(duì)列的綁定關(guān)系。
1、定義4個(gè)隊(duì)列
/**
* 定義4個(gè)隊(duì)列
*/
@Configuration
protected static class QueueConfig {
@Bean
public Queue queue1() {
return QueueBuilder.durable("queue-1").build();
}
@Bean
public Queue queue2() {
return QueueBuilder.durable("queue-2").build();
}
@Bean
public Queue queue3() {
return QueueBuilder.durable("queue-3").build();
}
@Bean
public Queue queue4() {
return QueueBuilder.durable("queue-4").build();
}
}
2、定義Fanout交換機(jī)和隊(duì)列綁定關(guān)系
/**
* 定義Fanout交換機(jī)和對應(yīng)的綁定關(guān)系
*/
@Configuration
protected static class FanoutExchangeBindingConfig {
@Bean
public FanoutExchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange("fanout-exchange").build();
}
/**
* 定義多個(gè)Fanout交換機(jī)和隊(duì)列的綁定關(guān)系
* @param fanoutExchange
* @param queue1
* @param queue2
* @param queue3
* @param queue4
* @return
*/
@Bean
public Declarables bindQueueToFanoutExchange(FanoutExchange fanoutExchange, Queue queue1, Queue queue2, Queue queue3, Queue queue4) {
Binding queue1Binding = BindingBuilder.bind(queue1).to(fanoutExchange);
Binding queue2Binding = BindingBuilder.bind(queue2).to(fanoutExchange);
Binding queue3Binding = BindingBuilder.bind(queue3).to(fanoutExchange);
Binding queue4Binding = BindingBuilder.bind(queue4).to(fanoutExchange);
return new Declarables(queue1Binding, queue2Binding, queue3Binding, queue4Binding);
}
}
備注:這里我們將4個(gè)隊(duì)列綁定到了名為
fanout-exchange
的交換機(jī)上。
2、定義Direct交換機(jī)和隊(duì)列綁定關(guān)系
@Configuration
protected static class DirectExchangeBindingConfig {
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("direct-exchange").build();
}
@Bean
public Binding bindingQueue3ToDirectExchange(DirectExchange directExchange, Queue queue3) {
return BindingBuilder.bind(queue3).to(directExchange).with("queue3-route-key");
}
}
備注:這里我們定義了名為
direct-exchange
的交換機(jī)并通過路由keyqueue3-route-key
將queue-3
綁定到了該交換機(jī)上。
3、定義Topic交換機(jī)和隊(duì)列綁定關(guān)系
@Configuration
protected static class TopicExchangeBindingConfig {
@Bean
public TopicExchange topicExchange() {
return ExchangeBuilder.topicExchange("topic-exchange").build();
}
@Bean
public Declarables bindQueueToTopicExchange(TopicExchange topicExchange, Queue queue1, Queue queue2) {
Binding queue1Binding = BindingBuilder.bind(queue1).to(topicExchange).with("com.order.*");
Binding queue2Binding = BindingBuilder.bind(queue2).to(topicExchange).with("com.#");
return new Declarables(queue1Binding, queue2Binding);
}
}
這里我們定義了名為topic-exchange
類型的交換機(jī),該類型交換機(jī)支持路由key通配符匹配,*
代表一個(gè)任意字符,#
代表一個(gè)或多個(gè)任意字符。
備注:
- 通過路由key
com.order.*
將queue-1
綁定到了該交換機(jī)上。- 通過路由key
com.#
將queue-2
也綁定到了該交換機(jī)上。
4、定義Header交換機(jī)和隊(duì)列綁定關(guān)系
@Configuration
protected static class HeaderExchangeBinding {
@Bean
public HeadersExchange headersExchange() {
return ExchangeBuilder.headersExchange("headers-exchange").build();
}
@Bean
public Binding bindQueueToHeadersExchange(HeadersExchange headersExchange, Queue queue4) {
return BindingBuilder.bind(queue4).to(headersExchange).where("function").matches("logging");
}
}
備注:這里我們定義了名為
headers-exchange
類型的交換機(jī),并通過參數(shù)function=logging
將queue-4
綁定到了該交換機(jī)上。
四、RabbitMQ消費(fèi)者配置
Spring RabbitMQ中支持注解式監(jiān)聽端點(diǎn)配置,用于異步接收消息,如下:
@Slf4j
@Component
public class RabbitMqConsumer {
@RabbitListener(queues = "queue-1")
public void handleMsgFromQueue1(String msg) {
log.info("Message received from queue-1, message body: {}", msg);
}
@RabbitListener(queues = "queue-2")
public void handleMsgFromQueue2(String msg) {
log.info("Message received from queue-2, message body: {}", msg);
}
@RabbitListener(queues = "queue-3")
public void handleMsgFromQueue3(String msg) {
log.info("Message received from queue-3, message body: {}", msg);
}
@RabbitListener(queues = "queue-4")
public void handleMsgFromQueue4(String msg) {
log.info("Message received from queue-4, message body: {}", msg);
}
}
備注:這里我們分別定義了4個(gè)消費(fèi)者,分別用來接受4個(gè)隊(duì)列的消息。
五、RabbitMQ生產(chǎn)者
@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {
private final RabbitTemplate rabbitTemplate;
public void sendMsgToFanoutExchange(String body) {
log.info("開始發(fā)送消息到fanout-exchange, 消息體:{}", body);
Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
rabbitTemplate.send("fanout-exchange", StringUtils.EMPTY, message);
}
public void sendMsgToDirectExchange(String body) {
log.info("開始發(fā)送消息到direct-exchange, 消息體:{}", body);
Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
rabbitTemplate.send("direct-exchange", "queue3-route-key", message);
}
public void sendMsgToTopicExchange(String routingKey, String body) {
log.info("開始發(fā)送消息到topic-exchange, 消息體:{}", body);
Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
rabbitTemplate.send("topic-exchange", routingKey, message);
}
public void sendMsgToHeadersExchange(String body) {
log.info("開始發(fā)送消息到headers-exchange, 消息體:{}", body);
MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setHeader("function", "logging").build();
Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
rabbitTemplate.send("headers-exchange", StringUtils.EMPTY, message);
}
}
六、測試用例
這里寫了個(gè)簡單的Controller用來測試,如下:
@RestController
@RequiredArgsConstructor
public class RabbitMsgController {
private final RabbitMqProducer rabbitMqProducer;
@RequestMapping("/exchange/fanout")
public ResponseEntity<String> sendMsgToFanoutExchange(String body) {
rabbitMqProducer.sendMsgToFanoutExchange(body);
return ResponseEntity.ok("廣播消息發(fā)送成功");
}
@RequestMapping("/exchange/direct")
public ResponseEntity<String> sendMsgToDirectExchange(String body) {
rabbitMqProducer.sendMsgToDirectExchange(body);
return ResponseEntity.ok("消息發(fā)送到Direct交換成功");
}
@RequestMapping("/exchange/topic")
public ResponseEntity<String> sendMsgToTopicExchange(String routingKey, String body) {
rabbitMqProducer.sendMsgToTopicExchange(routingKey, body);
return ResponseEntity.ok("消息發(fā)送到Topic交換機(jī)成功");
}
@RequestMapping("/exchange/headers")
public ResponseEntity<String> sendMsgToHeadersExchange(String body) {
rabbitMqProducer.sendMsgToHeadersExchange(body);
return ResponseEntity.ok("消息發(fā)送到Headers交換機(jī)成功");
}
}
1、發(fā)送到FanoutExchage
直接訪問http://localhost:8080/exchange/fanout?body=hello
,可以看到該消息廣播到了4個(gè)隊(duì)列上。
2023-11-07 17:41:12.959 INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer : 開始發(fā)送消息到fanout-exchange, 消息體:hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#1-5] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-1, message body: hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#0-4] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-4, message body: hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#3-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-3, message body: hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#2-4] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-2, message body: hello
2、發(fā)送到DirectExchage
訪問http://localhost:8080/exchange/direct?body=hello
,可以看到消息通過路由keyqueue3-route-key
發(fā)送到了queue-3
上。
2023-11-07 17:43:26.804 INFO 39460 --- [nio-8080-exec-1] c.u.r.i.producer.RabbitMqProducer : 開始發(fā)送消息到direct-exchange, 消息體:hello
2023-11-07 17:43:26.822 INFO 39460 --- [ntContainer#3-5] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-3, message body: hello
3、發(fā)送到TopicExchange
訪問http://localhost:8080/exchange/topic?body=hello&routingKey=com.order.create
,路由key為 com.order.create
的消息分別發(fā)送到了queue-1
和queue-2
上。
2023-11-07 17:44:45.301 INFO 39460 --- [nio-8080-exec-4] c.u.r.i.producer.RabbitMqProducer : 開始發(fā)送消息到topic-exchange, 消息體:hello
2023-11-07 17:44:45.312 INFO 39460 --- [ntContainer#1-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-1, message body: hello
2023-11-07 17:44:45.312 INFO 39460 --- [ntContainer#2-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-2, message body: hello
4、發(fā)動到HeadersExchage
訪問http://localhost:8080/exchange/headers?body=hello
,消息通過頭部信息function=logging
發(fā)送到了headers-exchange
上。文章來源:http://www.zghlxwxcb.cn/news/detail-797735.html
2023-11-07 17:47:21.736 INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer : 開始發(fā)送消息到headers-exchange, 消息體:hello
2023-11-07 17:47:21.749 INFO 39460 --- [ntContainer#0-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-4, message body: hello
七、結(jié)語
下一節(jié)我們將會介紹通過兩種方式借由RabbitMQ實(shí)現(xiàn)延遲消息發(fā)送和訂閱,敬請期待。文章來源地址http://www.zghlxwxcb.cn/news/detail-797735.html
到了這里,關(guān)于Spring RabbitMQ那些事(1-交換機(jī)配置&消息發(fā)送訂閱實(shí)操)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!