消息隊(duì)列概念:是在消息的傳輸過程中保存消息的容器。
作用:異步處理、應(yīng)用解耦、流量控制.....
RabbitMQ:
?
?
SpringBoot繼承RabbitMQ步驟:
? ? ? ? 1.加入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
? ? ? ? ?2.配置
spring:
rabbitmq:
host: 192.168.127.129
virtual-host: / # 指定虛擬主機(jī)
port: 5672
? ? ? ? 3.開啟(如果不需要監(jiān)聽消息也就是不消費(fèi)就不需要該注解開啟)
@EnableRabbit
? ? ? ? 4.創(chuàng)建隊(duì)列、交換機(jī)、以及綁定它們之間的關(guān)系
? ?
@Configuration
public class MyMQConfig {
/**
* 創(chuàng)建隊(duì)列
* @return
*/
@Bean
public Queue createQueue(){
//String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments
Queue queue = new Queue("order.queue",true,false,false);
return queue;
}
/**
* 創(chuàng)建交換機(jī)
* @return
*/
@Bean
public Exchange createExchange(){
//因?yàn)檫@個交換機(jī)需要根據(jù)路由進(jìn)行發(fā)送 所以使用TopicExchange
//String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
TopicExchange topicExchange = new TopicExchange("order-event-exchange",true,false);
return topicExchange;
}
/**
* 通過路由綁定交換機(jī)和隊(duì)列之間的關(guān)系
* @return
*/
@Bean
public Binding createBinding(){
//String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments
Binding binding = new Binding("order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.route",
null
);
return binding;
}
}
? ? ? ? 4.發(fā)送消息
@Autowired
RabbitTemplate rabbitTemplate;
@ResponseBody
@GetMapping("/sendmq")
public String sendmq(){
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
//發(fā)送消息 String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData
rabbitTemplate.convertAndSend("order-event-exchange","order.route",orderEntity);
return "ok";
}
? ? ? ? 5.消費(fèi)消息(監(jiān)聽消息)
@Component
@RabbitListener(queues = "create.queue")
public class OrderCloseListener {
@RabbitHandler
public void orderClose(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
System.out.println("消費(fèi)消息");
}
}
問題1:以上消息發(fā)送和消費(fèi)中,如果傳輸?shù)臄?shù)據(jù)是java對象,默認(rèn)使用的jdk序列化機(jī)制,我們經(jīng)常需要使用json傳遞就需要修改傳輸格式j(luò)son
修改方法如下:
@Configuration
public class RabbitConfig {
//發(fā)送消息為對象的時候 使用json的格式
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
問題2:在消息的發(fā)送和消費(fèi)還有消息儲存過程中,我們需要保證消息的可靠性,避免消息的丟失保證業(yè)務(wù)數(shù)據(jù)的正確
? ? ? ? 1.消息儲存:使用持久化
? ? ? ? 1.消息發(fā)送:開啟消息投靠確認(rèn)機(jī)制
spring:
rabbitmq:
host: 192.168.127.129
virtual-host: / # 指定虛擬主機(jī)
port: 5672
# publisher-confirms: true
publisher-confirm-type: simple # 開啟生產(chǎn)者消息確認(rèn)模式
publisher-returns: true
@Configuration
public class RabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 定制rabbitTemplate
* 消息發(fā)送確認(rèn)
*/
@PostConstruct //表示RabbitConfig對象創(chuàng)建之后執(zhí)行該方法
public void initRabbitTemplate(){
//消息成功發(fā)送到服務(wù)器之后的成功回調(diào)
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 消息的唯一id
* @param b 消息是否成功
* @param s 消息失敗的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("confirm===correlationData:"+ correlationData+ "ack:"+ b);
}
});
//消息發(fā)送到隊(duì)列queue失敗執(zhí)行的回調(diào)
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 消息的內(nèi)容
* @param i 回復(fù)的狀態(tài)碼
* @param s 回復(fù)的文本內(nèi)容
* @param s1 那個交換機(jī)
* @param s2 那個路由key
*
* 最常見的就是路由key不對
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("fail====>message:"+ message+"狀態(tài)碼:"+i + "錯誤提示:"+ s+ "交換機(jī):"+s1 + "路由:"+ s2);
}
});
}
}
異常操作之后可以達(dá)到消息發(fā)送端確認(rèn)機(jī)制
? ? ? ? 3.消息消費(fèi)端的確認(rèn)機(jī)制
spring:
rabbitmq:
host: 192.168.127.129
virtual-host: / # 指定虛擬主機(jī)
port: 5672
# publisher-confirms: true
publisher-confirm-type: simple # 開啟生產(chǎn)者消息確認(rèn)模式
publisher-returns: true
template:
mandatory: true
listener:
simple:
acknowledge-mode: manual # 開啟消費(fèi)者 手動簽收消息功能
@Service
@RabbitListener(queues = "create.queue")
public class OrderCloseListener {
@RabbitHandler
public void orderClose(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
System.out.println("消費(fèi)消息。。。.");
try{
//業(yè)務(wù)邏輯
//手動確認(rèn)消息消費(fèi)成功,消息不在寫人隊(duì)列
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
//消息消費(fèi)失敗(業(yè)務(wù)失?。瑢⑾⒃诖螌懙疥?duì)列避免消息丟失
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
通過以上消息的發(fā)送和消費(fèi)端都確認(rèn)之后我們消息一定的是可靠的。
案例:文章來源:http://www.zghlxwxcb.cn/news/detail-668436.html
? ? ? ? 在實(shí)際的開發(fā)中我們經(jīng)常會有取消訂單的功能,就可以使用消息隊(duì)列延遲消費(fèi)消息,具體實(shí)現(xiàn)通過個死信隊(duì)列,把消息先放到死信隊(duì)列,當(dāng)消息到期之后轉(zhuǎn)到到期隊(duì)列,監(jiān)聽到期隊(duì)列然后達(dá)到訂單取消功能文章來源地址http://www.zghlxwxcb.cn/news/detail-668436.html
到了這里,關(guān)于MQ消息隊(duì)列(主要介紹RabbitMQ)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!