RabbitMQ中的路由模式(Direct模式)應該是在實際工作中運用的比較多的一種模式了,這個模式和發(fā)布與訂閱模式的區(qū)別在于路由模式需要有一個routingKey,在配置上,交換機類型需要注入DirectExchange類型的交換機bean對象。在交換機和隊列的綁定過程中,綁定關系需要在綁定一個路由key。由于在實際的工作中不大可能會用自動確認的模式,所以我們在整合路由模式的過程中,依然采用發(fā)送消息雙確認機制和消費端手動確認的機制來保證消息的準確送達與消息防丟失。
1. 添加配置
在配置文件中,配置rabbitmq的相關賬號信息,開啟消息發(fā)送回調機制,配置文件其實和發(fā)布訂閱模式是一樣的。配置詳情如下:
server:
port: 10001
spring:
application:
name: springboot-rabbitmq-s1
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
username: admin
password: admin
# 發(fā)送者開啟 return 確認機制
publisher-returns: true
# 發(fā)送者開啟 confirm 確認機制
publisher-confirm-type: correlated
2. 創(chuàng)建配置類
創(chuàng)建配置類RabbitMQConfig,用于聲明交換機、隊列,建立隊列和交換機的綁定關系,注入RabbitTemplate的bean對象。配置類詳情如下:
package com.study.rabbitmq.config;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author alen
* @DATE 2022/6/7 23:50
*/
@Slf4j
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "direct-order-exchange";
public static final String SMS_QUEUE = "sms-direct-queue";
public static final String EMAIL_QUEUE = "email-direct-queue";
public static final String WECHAT_QUEUE = "wechat-direct-queue";
/**
* 1.
* 聲明交換機
* @return
*/
@Bean
public DirectExchange directExchange() {
/**
* directExchange的參數(shù)說明:
* 1. 交換機名稱
* 2. 是否持久化 true:持久化,交換機一直保留 false:不持久化,用完就刪除
* 3. 是否自動刪除 false:不自動刪除 true:自動刪除
*/
return new DirectExchange(EXCHANGE_NAME, true, false);
}
/**
* 2.
* 聲明隊列
* @return
*/
@Bean
public Queue smsQueue() {
/**
* Queue構造函數(shù)參數(shù)說明
* 1. 隊列名
* 2. 是否持久化 true:持久化 false:不持久化
*/
return new Queue(SMS_QUEUE, true);
}
@Bean
public Queue emailQueue() {
return new Queue(EMAIL_QUEUE, true);
}
@Bean
public Queue wechatQueue() {
return new Queue(WECHAT_QUEUE, true);
}
/**
* 3.
* 隊列與交換機綁定
*/
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
}
@Bean
public Binding wechatBinding() {
return BindingBuilder.bind(wechatQueue()).to(directExchange()).with("wechat");
}
/**
* 將自定義的RabbitTemplate對象注入bean容器
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//設置開啟消息推送結果回調
rabbitTemplate.setMandatory(true);
//設置ConfirmCallback回調
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("==============ConfirmCallback start ===============");
log.info("回調數(shù)據(jù):{}", correlationData);
log.info("確認結果:{}", ack);
log.info("返回原因:{}", cause);
log.info("==============ConfirmCallback end =================");
}
});
//設置ReturnCallback回調
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("==============ReturnCallback start ===============");
log.info("發(fā)送消息:{}", JSONUtil.toJsonStr(message));
log.info("結果狀態(tài)碼:{}", replyCode);
log.info("結果狀態(tài)信息:{}", replyText);
log.info("交換機:{}", exchange);
log.info("路由key:{}", routingKey);
log.info("==============ReturnCallback end =================");
}
});
return rabbitTemplate;
}
}
3. 消費者配置
在消費者項目的配置文件中開啟手動確認,配置詳情如下:
server:
port: 10002
spring:
application:
name: springboot-rabbitmq-s2
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
username: admin
password: admin
listener:
simple:
# 表示消費者消費成功消息以后需要手工的進行簽收(ack確認),默認為 auto
acknowledge-mode: manual
4. 創(chuàng)建消費者
分別創(chuàng)建三個消費者,DirectEmailConsumer、DirectSmsConsumer、DirectWechatConsumer來監(jiān)聽對應的隊列,有消息后進行消費,三個消費者大同小異,分別如下
4.1 DirectEmailConsumer
package com.study.rabbitmq.service.direct;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* @Author alen
* @DATE 2022/6/10 22:54
*/
@Slf4j
@Service
@RabbitListener(queues = {"email-direct-queue"}) //監(jiān)聽隊列
public class DirectEmailConsumer {
//標記消費者邏輯執(zhí)行方法
@RabbitHandler
public void emailMessage(String msg, Channel channel, Message message) throws IOException {
try {
log.info("Email direct --接收到消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重復處理失敗,拒絕再次接收...");
//basicReject: 拒絕消息,與basicNack區(qū)別在于不能進行批量操作,其他用法很相似 false表示消息不再重新進入隊列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
} else {
log.error("消息即將再次返回隊列處理...");
// basicNack:表示失敗確認,一般在消費消息業(yè)務異常時用到此方法,可以將消息重新投遞入隊列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
4.2 DirectSmsConsumer
package com.study.rabbitmq.service.direct;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* @Author alen
* @DATE 2022/6/10 22:55
*/
@Slf4j
@Service
@RabbitListener(queues = {"sms-direct-queue"}) //監(jiān)聽隊列
public class DirectSmsConsumer {
@RabbitHandler
public void smsMessage(String msg, Channel channel, Message message) throws IOException {
try {
log.info("sms direct --接收到消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重復處理失敗,拒絕再次接收...");
//basicReject: 拒絕消息,與basicNack區(qū)別在于不能進行批量操作,其他用法很相似 false表示消息不再重新進入隊列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
} else {
log.error("消息即將再次返回隊列處理...");
// basicNack:表示失敗確認,一般在消費消息業(yè)務異常時用到此方法,可以將消息重新投遞入隊列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
4.3 DirectWechatConsumer
package com.study.rabbitmq.service.direct;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* @Author chaoxian.wu
* @DATE 2022/6/10 22:55
*/
@Slf4j
@Service
@RabbitListener(queues = {"wechat-direct-queue"}) //監(jiān)聽隊列
public class DirectWechatConsumer {
@RabbitHandler
public void wechatlMessage(String msg, Channel channel, Message message) throws IOException {
try {
log.info("wechat direct --接收到消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重復處理失敗,拒絕再次接收...");
//basicReject: 拒絕消息,與basicNack區(qū)別在于不能進行批量操作,其他用法很相似 false表示消息不再重新進入隊列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
} else {
log.error("消息即將再次返回隊列處理...");
// basicNack:表示失敗確認,一般在消費消息業(yè)務異常時用到此方法,可以將消息重新投遞入隊列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
以上就是全部的代碼部分,接下來我們在進入測試,看看實際效果如何,先發(fā)布一個routingKey=sms的消息,查看是不是只有對應的一個隊列中接收到消息,消息發(fā)送詳情:
package com.study.rabbitmq;
import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.UUID;
@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {
@Autowired
private OrderService orderService;
@Test
void contextLoads() {
for (long i = 1; i < 2; i++) {
//交換機名稱
String exchangeName = "direct-order-exchange";
//路由key
String routingKey = "sms";
Order order = buildOrder(i);
orderService.createOrder(order, routingKey, exchangeName);
}
}
private Order buildOrder(long id) {
Order order = new Order();
order.setRequestId(id);
order.setUserId(id);
order.setOrderNo(UUID.randomUUID().toString());
order.setAmount(10L);
order.setGoodsNum(1);
order.setTotalAmount(10L);
return order;
}
}
我們登錄rabbitmq管理后臺查看下,只有sms-direct-queue這個隊列有一條消息,效果如下:
我們啟動消費者,看下是不是只有監(jiān)聽了sms-direct-queue這個隊列的消費者有消費日志,效果如下:
再發(fā)一條routingKey=email的消息,消費的日志,效果圖示如下
到此其實已經(jīng)springboot整合rabbitmq的路由模式結束了,這種模式在工作中還是比較常見的,我們演示的是單點的效果,實際工作中,不大可能會使用服務單點部署,現(xiàn)在都講究服務的高可用,就得服務集群部署,又會涉及到消息重復消費的問題需要處理,我個人覺得,遇到重復消費問題,我第一時間想到的就是分布式鎖,哈哈~。但是鎖什么呢?肯定是消息中的具備唯一性的屬性。來達到防止消息的重復消費。
整個過程中,其實還存在一個小問題沒有驗證,就是ReturnCallback回調機制沒有觸發(fā),因為這個得發(fā)生在交換機將消息發(fā)送到隊列的時候失敗才會觸發(fā),那么我們就發(fā)送一個不存在的routingKey就可以觸發(fā)了,我們發(fā)送一個routingKey=duanxin的消息,這個肯定不會發(fā)送成功,我們通過斷點來看看效果,效果如下:
文章來源:http://www.zghlxwxcb.cn/news/detail-681763.html
然后我們常見的就全部整合完成了,當然,開啟了雙確認機制,雖然我們可以檢測到消息投送的結果,然后可以針對投送失敗的結果進行預警。但是開啟了這個操作,就必然會對消息的處理效率產(chǎn)生影響。所以還得根據(jù)實際業(yè)務場景而定是否需要使用這個確認機制。文章來源地址http://www.zghlxwxcb.cn/news/detail-681763.html
到了這里,關于Spring Boot整合RabbitMQ之路由模式(Direct)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!