前言:
??作者簡介:我是笑霸final,一名熱愛技術的在校學生。
??個人主頁:個人主頁1 || 笑霸final的主頁2
??系列專欄:后端專欄
??如果文章知識點有錯誤的地方,請指正!和大家一起學習,一起進步??
??如果感覺博主的文章還不錯的話,??點贊?? + ??關注?? + ??收藏??
話不多說 直接開干
一 導入maven坐標與配置
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
基礎配置文件
spring:
rabbitmq:
username: 你的用戶名
password: 你的密碼
host: rabbitmq安裝的主機的 ip地址
port: 5672 #端口號
二、直連交換機direct exchange
直連型交換機(direct exchange)是根據(jù)消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。
- 將一個隊列 綁定到某個交換機上,同時賦予該綁定一個路由鍵(routing key)
- 當一個攜帶著路由鍵為
routingKey01
的消息被發(fā)送給直連交換機時,交換機會把它路由給綁定值同樣為routingKey01
的隊列。
直連交換機經常用來循環(huán)分發(fā)任務給多個工作者(workers)。當這樣做的時候,我們需要明白一點,在AMQP 0-9-1中,消息的負載均衡是發(fā)生在消費者(consumer)之間的,而不是隊列(queue)之間。
2.1配置類QueueConfig
@Configuration
public class QueueConfig {
/**
* 創(chuàng)建一個隊列 隊列名為direct1
* */
@Bean
public Queue queue01(){
return new Queue("direct1",true);//true表示持久化
}
/**
* 創(chuàng)建一個直連交換機 名為directExchange
* */
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
/**
* 在讓隊列和直連交換機綁定在一起
* */
@Bean
public Binding binding(){
Binding binding= BindingBuilder
.bind(queue01())
.to(directExchange()).with("routingKey01");
return binding;
}
}
2.2消息提供者
@Component
public class MqProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void sent_test(Object o){
//convertAndSend(交換機的名字,交換機中路由鍵名稱,參數(shù))
rabbitTemplate.convertAndSend(
"directExchange",//交換機名字
"routingKey01",//路由key
o);
}
}
2.2消息消費者
@Component
@Slf4j
public class MqConsumer {
/**
* 接收消息
*/
@RabbitListener(queues = {"direct1"})
public void receivedD(Message message, Channel channel)throws Exception{
String msg=new String(message.getBody());
log.info("當前時間:{},消費者1收到消息:{}",new Date().toString(),msg);
}
}
我寫了兩個消費者內容一致
2.3測試類
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {
@Resource
private MqProducer mqProducer;//注入消息提供者
@Test
public void test_send() throws InterruptedException {
// 循環(huán)發(fā)送消息
while (true) {
mqProducer.sent_test("你好,我是Lottery 001");
Thread.sleep(3500);
}
}
}
測試結果
三、默認交換機default exchange
默認交換機
(default exchange)實際上是
一個由消息代理預先聲明好的沒有名字
(名字為空字符串)的直連交換機(direct exchange)。它有一個特殊的屬性使得它對于簡單應用特別有用處:那就是每個新建隊列
(queue)都會自動
綁定到默認交換機
上,綁定的路由鍵(routing key)名稱
與隊列名稱
相同。
3.1配置類和消息提供者
/**
*配置類
*/
@Configuration
public class QueueConfig {
//只需要創(chuàng)建一個隊列
//每個`新建隊列`(queue)都會`自動`綁定到`默認交換機`上,
//綁定的`路由鍵(routing //key)名稱`與`隊列名稱` 相同
@Bean
public Queue queue02(){
return new Queue("def");
}
}
/**
*消息提供者
*/
@Component
public class MqProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void def_sent_test(Object obj){
//convertAndSend(交換機的名字,交換機中路由鍵名稱,參數(shù))
rabbitTemplate.convertAndSend(
//沒有名字(名字為空字符串)
"",
"def",
obj);//消息內容
}
}
默認交換機名字是
空字符串
。每個新建隊列
(queue)都會自動
綁定到默認交換機
上,綁定的路由鍵(routing key)名稱
與隊列名稱
相同。
3.2消息消費者
@Component
@Slf4j
public class MqConsumer {
/**
* 接收消息
*/
@RabbitListener(queues = {"def"})
public void receivedD02(Message message, Channel channel)throws Exception{
String msg=new String(message.getBody());
log.info("當前時間:{},消費者收到消息:{}",new Date().toString(),msg);
}
}
3.3測試結果
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {
@Resource
private MqProducer mqProducer;//注入消息提供者
@Test
public void test_send02() throws InterruptedException {
// 循環(huán)發(fā)送消息
while (true) {
mqProducer.def_sent_test("測試默認交換機");
Thread.sleep(3500);
}
}
}
四、扇型交換機fanout exchange
扇型交換機(fanout exchange)將消息路由給綁定到它身上的所有隊列,而不理會綁定的路由鍵。如果N個隊列綁定到某個扇型交換機上,當有消息發(fā)送給此扇型交換機時,交換機會將消息的拷貝分別發(fā)送給這所有的N個隊列。扇型用來交換機處理消息的廣播路由(broadcast routing)
這個交換機上的路由鍵將失效
4.1配置類
@Configuration
public class QueueConfig {
/**
* 創(chuàng)建多個隊列
* @return
*/
@Bean
public Queue queue03_1(){
return new Queue("fanout03_1");
}
@Bean
public Queue queue03_2(){
return new Queue("fanout03_2");
}
@Bean
public Queue queue03_3(){
return new Queue("fanout03_3");
}
/**
* 創(chuàng)建一個扇形交換機
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
/**
* 隊列和扇形交換機綁定
*/
@Bean
public Binding binding_3_1(){
Binding binding= BindingBuilder
.bind(queue03_1())
.to(fanoutExchange());
return binding;
}
@Bean
public Binding binding_3_2(){
Binding binding= BindingBuilder
.bind(queue03_2())
.to(fanoutExchange());
return binding;
}
@Bean
public Binding binding_3_3(){
Binding binding= BindingBuilder
.bind(queue03_3())
.to(fanoutExchange());
return binding;
}
}
4.2消息提供者
@Component
public class MqProducer {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 扇形交換機
*/
public void fanout_sent_test(Object o){
//convertAndSend(交換機的名字,交換機中路由鍵名稱,參數(shù))
rabbitTemplate.convertAndSend(
"fanoutExchange",
"",//扇形交換機也沒有路由建
o);
}
}
注意:扇形交換機也
沒有路由key
也用空字符串
4.3消息消費者
@Component
@Slf4j
public class MqConsumer {
@RabbitListener(queues = {"fanout03_1"})
public void receivedD03_1(Message message, Channel channel)throws Exception{
String msg=new String(message.getBody());
log.info("綁定隊列一 當前時間:{},消費者收到消息:{}",new Date().toString(),msg);
}
@RabbitListener(queues = {"fanout03_2"})
public void receivedD03_2(Message message, Channel channel)throws Exception{
String msg=new String(message.getBody());
log.info("綁定隊列二 當前時間:{},消費者收到消息:{}",new Date().toString(),msg);
}
@RabbitListener(queues = {"fanout03_3"})
public void receivedD03_3(Message message, Channel channel)throws Exception{
String msg=new String(message.getBody());
log.info("綁定隊列三 當前時間:{},消費者收到消息:{}",new Date().toString(),msg);
}
}
4.4測試類
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {
@Resource
private MqProducer mqProducer;//注入消息提供者
@Test
public void test_send03() throws InterruptedException {
int a=1;
// 循環(huán)發(fā)送消息
while (true) {
mqProducer.fanout_sent_test("測試扇形交換機 第"+ a++ +"次循環(huán)");
Thread.sleep(3500);
}
}
}
五、主題交換機topic exchanges
主題交換機(topic exchanges)通過對消息的
路由鍵
和隊列
到交換機的綁定模式之間的匹配,將消息
路由給
一個或多個隊列。主題交換機經常用來實現(xiàn)各種分發(fā)/訂閱模式及其變種。主題交換機通常用來實現(xiàn)消息的多播路由(multicast routing)。
5.1配置類
@Configuration
public class QueueConfig {
/**
* 創(chuàng)建;兩個隊列
*/
@Bean
public Queue topicQueue_1(){
return new Queue("topicQueue_1");
}
@Bean
public Queue topicQueue_2(){
return new Queue("topicQueue_2");
}
/**
* 創(chuàng)建主題交換機
*/
@Bean
public TopicExchange TopicExchange(){
return new TopicExchange("TopicExchange");
}
/**
* 根據(jù)不同的key綁定不同的隊列
*/
@Bean
public Binding bindingTopicExchange_1(){
Binding binding= BindingBuilder
.bind(topicQueue_1())
.to(TopicExchange()).with("key1");
return binding;
}
@Bean
public Binding bindingTopicExchange_2(){
Binding binding= BindingBuilder
.bind(topicQueue_2())
.to(TopicExchange()).with("key2");
return binding;
}
}
5.2消息提供者
@Component
public class MqProducer {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 主題交換機
*/
public void topic_sent_test(Object o,String key){
rabbitTemplate.convertAndSend(
"TopicExchange",
key, //后面動態(tài)的傳遞key
o);
}
}
5.3消息消費者
@Component
@Slf4j
public class MqConsumer1 {
/**
* 接收消息
*/
@RabbitListener(queues = {"topicQueue_1"})
public void topicQueue_1(Message message, Channel channel)throws Exception{
String msg=new String(message.getBody());
log.info("隊列一 當前時間:{},消費者收到消息:{}",new Date().toString(),msg);
}
@RabbitListener(queues = {"topicQueue_2"})
public void topicQueue_2(Message message, Channel channel)throws Exception{
String msg=new String(message.getBody());
log.info("隊列二 當前時間:{},消費者收到消息:{}",new Date().toString(),msg);
}
}
5.4測試
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {
@Resource
private MqProducer mqProducer;//注入消息提供者
@Test
public void test_send04() throws InterruptedException {
// 循環(huán)發(fā)送消息
int a=1;
while (true) {
if(a%2 == 0){
mqProducer.topic_sent_test("?。〗o隊列二的消息==第"
+ a++ +"次循環(huán)","key2");
}else{
mqProducer.topic_sent_test("?。〗o隊列一的消息==第"
+ a++ +"次循環(huán)","key1");
}
Thread.sleep(3500);
}
}
}
使用案例:
- 分發(fā)有關于特定地理位置的數(shù)據(jù),例如銷售點
- 由多個工作者(workers)完成的后臺任務,每個工作者負責處理某些特定的任務
- 股票價格更新(以及其他類型的金融數(shù)據(jù)更新)
- 涉及到分類或者標簽的新聞更新(例如,針對特定的運動項目或者隊伍)
- 云端的不同種類服務的協(xié)調
- 分布式架構/基于系統(tǒng)的軟件封裝,其中每個構建者僅能處理一個特定的架構或者系統(tǒng)。
六、頭交換機 headers exchange
有時消息的路由操作會涉及到多個屬性,此時使用消息頭就比用路由鍵更容易表達,頭交換機(headers exchange)就是為此而生的。頭交換機使用多個
消息屬性
來代替
路由鍵建立路由規(guī)則。通過判斷消息頭的值能否與指定的綁定相匹配來確立路由規(guī)則。文章來源:http://www.zghlxwxcb.cn/news/detail-580729.html
6.1配置類
@Configuration
public class QueueConfig {
/**
* 創(chuàng)建2個隊列
*/
@Bean(name = "headersQ1")
public Queue queue1() {
return new Queue("headersQ1");
}
@Bean(name = "headersQ2")
public Queue queue2() {
return new Queue("headersQ2");
}
/**
* 創(chuàng)建交換機
* @return
*/
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headersExchange");
}
/**
* 綁定交換機和隊列
*/
@Bean
public Binding binding1() {
HashMap<String, Object> header = new HashMap<>();
header.put("queue", "queue1");
header.put("bindType", "whereAll");
return BindingBuilder
.bind(queue1())
.to(headersExchange())
.whereAll(header).match();
}
@Bean
public Binding binding2() {
HashMap<String, Object> header = new HashMap<>();
header.put("queue", "queue2");
header.put("bindType", "whereAny");
return BindingBuilder
.bind(queue2())
.to(headersExchange())
.whereAny(header).match();
}
}
6.2創(chuàng)建消息提供者
@Component
public class MqProducer {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 頭交換機
* @param msg
*/
public void headers_send(String msg,int a) {
//a用來控制頭信息 達到傳遞給不同的隊列效果
MessageProperties messageProperties = new MessageProperties();
if( a % 3 ==0){
messageProperties.setHeader("queue", "queue2");
messageProperties.setHeader("bindType", "whereAny");
}else{
messageProperties.setHeader("queue", "queue1");
messageProperties.setHeader("bindType", "whereAll");
}
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("headersExchange", null, message);
}
}
6.3消息消費者
@Component
@Slf4j
public class MqConsumer1 {
/**
* 接收消息
*/
@RabbitListener(queues = "headersQ1")
public void receive1(String msg) {
log.info("接收到 headersQ1 發(fā)送的消息:" + msg);
}
@RabbitListener(queues = "headersQ2")
public void receive2(String msg) {
log.info("接收到 headersQ2 發(fā)送的消息:" + msg);
}
}
6、4測試結果
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringRunnerTest {
@Resource
private MqProducer mqProducer;//注入消息提供者
@Test
public void test_headers_send() throws InterruptedException {
// 循環(huán)發(fā)送消息
int a=1;
while (true) {
mqProducer.headers_send("消息"+a,a++);
Thread.sleep(3500);
}
}
}
文章來源地址http://www.zghlxwxcb.cn/news/detail-580729.html
到了這里,關于springboot與rabbitmq的整合【演示5種基本交換機】的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!