RabbitMQ的模式中,常用的模式有:簡單模式,發(fā)布與訂閱模式,工作模式,路由模式,主題模式。簡單模式不太會運用到工作中,我們可以使用 RabbitMQ 的發(fā)布訂閱模式,實現(xiàn):
- 用戶發(fā)布動態(tài),其“粉絲”收到其發(fā)布動態(tài)的消息
- 用戶下訂單,庫存模塊、支付模塊等收到消息并處理
- 等等
1. 創(chuàng)建RabbitMQ的生產(chǎn)者
創(chuàng)建一個springboot項目,項目創(chuàng)建idea的默認(rèn)創(chuàng)建springboot項目
然后進(jìn)行rabbitMq的整合過程
1.1 引入rabbitmq的jar包
在項目的pom.xml中引入rabbitmq的jar包,詳情如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>
1.2 配置文件中添加配置
在項目的配置文件中添加rabbitmq的相關(guān)配置,配置詳情如下:
server:
port: 10001
# rabbitMq 相關(guān)配置
spring:
application:
name: springboot-rabbitmq-s1
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
username: guest
password: guest
guest是rabbitmq的默認(rèn)密碼,不需要重新設(shè)置,不過在生產(chǎn)中為了安全是需要改密碼的
1.3 創(chuàng)建配置類
配置類用于將隊列和交換機進(jìn)行綁定,該操作也可以使用rabbitmq的管理界面操作,并不是一定需要的步驟。配置類詳情如下:
package com.study.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author alen
* @DATE 2022/6/7 23:50
*/
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "fanout-order-exchange";
public static final String SMS_QUEUE = "sms-fanout-queue";
public static final String EMAIL_QUEUE = "email-fanout-queue";
public static final String WECHAT_QUEUE = "wechat-fanout-queue";
/**
* 1.
* 聲明交換機
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
/**
* FanoutExchange的參數(shù)說明:
* 1. 交換機名稱
* 2. 是否持久化 true:持久化,交換機一直保留, false:不持久化,用完就刪除
* 3. 是否自動刪除 false:不自動刪除, true:自動刪除
*/
return new FanoutExchange(EXCHANGE_NAME, true, false);
}
/**
* 2.
* 聲明隊列
* @return
*/
@Bean
public Queue smsQueue() {
/**
* Queue構(gòu)造函數(shù)參數(shù)說明
* 1. 隊列名
* 2. 是否持久化 true:持久化, false:不持久化
*/
return new Queue(SMS_QUEUE, true);
}
@Bean
public Queue emailQueue() {
return new Queue(EMAIL_QUEUE, true);
}
@Bean
public Queue wechatQueue() {
return new Queue(WECHAT_QUEUE, true);
}
/**
* 3.
* 隊列與交換機綁定
*/
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
@Bean
public Binding wechatBinding() {
return BindingBuilder.bind(wechatQueue()).to(fanoutExchange());
}
}
1.4 模擬發(fā)送消息
創(chuàng)建一個service類,在類中進(jìn)行rabbitMq消息的發(fā)送,源碼如下:
package com.study.rabbitmq.service;
import cn.hutool.json.JSONUtil;
import com.study.rabbitmq.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @Author alen
* @DATE 2022/6/7 23:31
*/
@Service
@Slf4j
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
String body = JSONUtil.toJsonStr(order);
log.info("訂單信息:{}", body);
//交換機名稱
String exchangeName = "fanout-order-exchange";
//路由key 由于我們實現(xiàn)的是fanout模式(廣播模式),不需要路由key,所有的消費者都可以進(jìn)行監(jiān)聽和消費
String routeKey = "";
//發(fā)送mq消息
rabbitTemplate.convertAndSend(exchangeName, routeKey, body);
log.info("rabbitmq發(fā)送廣播模式消息成功。。。");
}
}
使用單元測試模擬消息發(fā)送,單元測試詳情如下:
package com.study.rabbitmq;
import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.UUID;
@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {
@Autowired
private OrderService orderService;
@Test
void contextLoads() {
for (long i = 1; i < 50; i++) {
Order order = new Order();
order.setRequestId(i);
order.setUserId(i);
order.setOrderNo(UUID.randomUUID().toString());
order.setAmount(10L);
order.setGoodsNum(1);
order.setTotalAmount(10L);
orderService.createOrder(order);
}
}
}
發(fā)送完后,我們可以在rabbitMq的管理后臺看到已經(jīng)發(fā)送成功的消息,效果如下:
可見消息已經(jīng)全部發(fā)送完畢,因為前面的三個隊列都是綁定在同一個交換機上,所以三個隊列都會收到消息。
2. 創(chuàng)建RabbitMQ的消費者
創(chuàng)建消費者服務(wù)S2,項目結(jié)構(gòu)參考生產(chǎn)者項目結(jié)構(gòu),然后進(jìn)行消息消費的相關(guān)代碼的實現(xiàn),實現(xiàn)過程如下
2.1 引入RabbitMQ的jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>
2.2 在項目配置文件中添加配置
配置詳情如下
server:
port: 10002
# rabbitmq 相關(guān)配置
spring:
application:
name: springboot-rabbitmq-s2
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
username: admin
password: admin
2.3 創(chuàng)建MQ消息消費者
消費者類詳情如下
package com.study.rabbitmq.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @Author alen
* @DATE 2022/6/8 8:15
*/
@Slf4j
@Service
@RabbitListener(queues = {"email-fanout-queue"}) //監(jiān)聽隊列
public class FanoutEmailConsumer {
@RabbitHandler
public void emailMessage(String message) {
log.info("Email fanout --接收到消息:{}", message);
}
}
啟動消費者項目,消費效果如下:
登錄rabbitMq后臺查看隊列的消息情況如下文章來源:http://www.zghlxwxcb.cn/news/detail-660941.html
到此,似乎感覺整合得很順利,沒啥毛病。但是實際的運用中,以上演示過程中忽略了兩個很重要的問題,一是我如何知道消息被順利的發(fā)送到了隊列,因為實際的工作中,不大可能每個消息都去rabbitmq管理后臺查看。二是如果消息在消費的過程中出現(xiàn)了異常導(dǎo)致消息丟失,不重要的數(shù)據(jù)還好,如果是支付類的消息呢?就會產(chǎn)生嚴(yán)重的線上問題。那么這兩個問題需要怎么處理呢?其實rabbitmq提供了消息發(fā)送結(jié)果回調(diào)和消息消費手動確認(rèn)來處理這兩個問題。文章來源地址http://www.zghlxwxcb.cn/news/detail-660941.html
到了這里,關(guān)于Spring Boot整合RabbitMQ之發(fā)布與訂閱模式的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!