1、前情提要【RabbitMQ】
【RabbitMQ】消息隊列-RabbitMQ篇章
RabbitMQ實現(xiàn)流程
2、RabbitMQ-SpringBoot案例 -fanout模式
2.1 實現(xiàn)架構(gòu)總覽
實現(xiàn)步驟:
1:創(chuàng)建生產(chǎn)者工程:sspringboot-rabbitmq-fanout-producer
2:創(chuàng)建消費者工程:springboot-rabbitmq-fanout-consumer
3:引入spring-boot-rabbitmq的依賴
4:進行消息的分發(fā)和測試
5:查看和觀察web控制臺的狀況
2.2 具體實現(xiàn)
2.2.1生產(chǎn)者
在這之前提前開好服務(wù)器,并且啟動mq的服務(wù),可參考上面的鏈接
- 1、創(chuàng)建生產(chǎn)者springboot工程:sspringboot-rabbitmq-fanout-producer
- 2、導(dǎo)入啟動(web、mq)依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
- 3、在application.yml進行配置
# 服務(wù)端口
server:
port: 8080
# 配置rabbitmq服務(wù)
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
# ip地址為開啟mq服務(wù)的服務(wù)器地址
host: 47.104.141.27
port: 5672
- 4:定義訂單的生產(chǎn)者
package com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @author: 學(xué)相伴-飛哥
* @description: OrderService
* @Date : 2021/3/4
*/
@Component
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 1: 定義交換機
private String exchangeName = "fanout_order_exchange";
// 2: 路由key
private String routeKey = "";
public void makeOrder(Long userId, Long productId, int num) {
// 1: 模擬用戶下單
String orderNumer = UUID.randomUUID().toString();
// 2: 根據(jù)商品id productId 去查詢商品的庫存
// int numstore = productSerivce.getProductNum(productId);
// 3:判斷庫存是否充足
// if(num > numstore ){ return "商品庫存不足..."; }
// 4: 下單邏輯
// orderService.saveOrder(order);
// 5: 下單成功要扣減庫存
// 6: 下單完成以后
System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer);
// 發(fā)送訂單信息給RabbitMQ fanout
rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
}
}
- 4、配置類綁定交換機和隊列的關(guān)系
package com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer.service;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class DirectRabbitConfig {
//隊列 起名:TestDirectQueue
@Bean
public Queue emailQueue() {
// durable:是否持久化,默認(rèn)是false,持久化隊列:會被存儲在磁盤上,當(dāng)消息代理重啟時仍然存在,暫存隊列:當(dāng)前連接有效
// exclusive:默認(rèn)也是false,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊列即被刪除。此參考優(yōu)先級高于durable
// autoDelete:是否自動刪除,當(dāng)沒有生產(chǎn)者或者消費者使用此隊列,該隊列會自動刪除。
// return new Queue("TestDirectQueue",true,true,false);
//一般設(shè)置一下隊列的持久化就好,其余兩個就是默認(rèn)false
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue smsQueue() {
// durable:是否持久化,默認(rèn)是false,持久化隊列:會被存儲在磁盤上,當(dāng)消息代理重啟時仍然存在,暫存隊列:當(dāng)前連接有效
// exclusive:默認(rèn)也是false,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊列即被刪除。此參考優(yōu)先級高于durable
// autoDelete:是否自動刪除,當(dāng)沒有生產(chǎn)者或者消費者使用此隊列,該隊列會自動刪除。
// return new Queue("TestDirectQueue",true,true,false);
//一般設(shè)置一下隊列的持久化就好,其余兩個就是默認(rèn)false
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue weixinQueue() {
// durable:是否持久化,默認(rèn)是false,持久化隊列:會被存儲在磁盤上,當(dāng)消息代理重啟時仍然存在,暫存隊列:當(dāng)前連接有效
// exclusive:默認(rèn)也是false,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊列即被刪除。此參考優(yōu)先級高于durable
// autoDelete:是否自動刪除,當(dāng)沒有生產(chǎn)者或者消費者使用此隊列,該隊列會自動刪除。
// return new Queue("TestDirectQueue",true,true,false);
//一般設(shè)置一下隊列的持久化就好,其余兩個就是默認(rèn)false
return new Queue("weixin.fanout.queue", true);
}
//Direct交換機 起名:TestDirectExchange
@Bean
public DirectExchange fanoutOrderExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("fanout_order_exchange", true, false);
}
//綁定 將隊列和交換機綁定, 并設(shè)置用于匹配鍵:TestDirectRouting
@Bean
public Binding bindingDirect1() {
return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");
}
@Bean
public Binding bindingDirect2() {
return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");
}
@Bean
public Binding bindingDirect3() {
return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");
}
}
- 6.測試
向隊列發(fā)送10條消息
package com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer;
import com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootRabbitmqFanoutProducerApplicationTests {
@Autowired
OrderService orderService;
@Test
public void contextLoads() throws Exception {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
Long userId = 100L + i;
Long productId = 10001L + i;
int num = 10;
orderService.makeOrder(userId, productId, num);
}
}
}
啟動測試,此時進入mq的web頁面,查看交換機和隊列是否綁定上了,查看是否有隊列消息
向所有隊列中都加入10條消息—說明交換機和隊列綁定沒問題
2.2.1消費者
參照生產(chǎn)者的創(chuàng)建方法,選擇在平級目錄下創(chuàng)建:
springboot-order-rabbitmq-consumber
2. 修改配置文件
# 服務(wù)器
server:
# 端口要改成不沖突的
port: 8081/
# rabbitmq配置
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 192.168.157.128 #127.0.0.1
port: 5672
- 創(chuàng)建FanoutEmailConsumer、FanoutNoteConsumer、FanoutSMSConsumer消費者接收
文章來源:http://www.zghlxwxcb.cn/news/detail-657216.html
4.運行測試文章來源地址http://www.zghlxwxcb.cn/news/detail-657216.html
到了這里,關(guān)于【RabbitMQ】RabbitMQ整合SpringBoot案例的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!