RabbitMq生產(chǎn)者發(fā)送消息失敗現(xiàn)象
一般情況下RabbitMq的生產(chǎn)者能夠正常的把消息投遞到交換機(jī)Exchange,Exchange能夠根據(jù)路由鍵routingKey把消息投遞到隊(duì)列Queue,但是一旦出現(xiàn)消息無(wú)法投遞到交換機(jī)Exchange,或無(wú)法路由到Queue的這種特殊情況下,則需要對(duì)生產(chǎn)者的消息進(jìn)行緩存或者保存到數(shù)據(jù)庫(kù),后續(xù)在調(diào)查完RabbitMq服務(wù)器的問(wèn)題之后,待RabbitMq服務(wù)器正常之后,需要對(duì)這些消息進(jìn)行重新投遞。正常來(lái)說(shuō)RabbitMq做了集群之后是不會(huì)出現(xiàn)這種問(wèn)題,整個(gè)集群掛斷的概率也是非常小。

錯(cuò)誤信息
當(dāng)項(xiàng)目啟動(dòng)后,然后把交換機(jī)Exchange刪除后,然后生產(chǎn)者發(fā)送消息時(shí)會(huì)提示交換機(jī)不存在。Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)
SpringBoot代碼示例
SpringBoot的application.properties需要新增spring.rabbitmq.publisher-confirm-type配置要求值是correlated。默認(rèn)值是none表示無(wú)需觸發(fā)交換機(jī)收到消息的回調(diào)接口。correlated表示消息發(fā)布后會(huì)觸發(fā)交換機(jī)收到消息的回調(diào)接口。
# springboot整合rabbitMq的配置
spring.rabbitmq.host=192.168.15.200
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated
隊(duì)列和交換機(jī)配置類(lèi)
package springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class ConfirmConfig {
// 普通交換機(jī)名稱(chēng)
public static final String EXCHANGE_NAME = "confirm_exchange";
// 隊(duì)列名稱(chēng)
public static final String QUEUE_NAME = "confirm_queue";
public static final String ROUTING_KEY = "key1";
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);
}
}
生產(chǎn)者消息發(fā)送確認(rèn)配置類(lèi)
package springbootrabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
// 1.先實(shí)現(xiàn)RabbitTemplate.ConfirmCallback接口,從寫(xiě)confirm回調(diào)函數(shù)
@Slf4j
@Component
public class RabbitMqCallBack implements RabbitTemplate.ConfirmCallback {
// 2.注入
@Autowired
private RabbitTemplate rabbitTemplate;
/**
*
* @param correlationData 消息
* @param b 發(fā)送成功是true,失敗是false
* @param s 發(fā)送失敗時(shí)的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("交換機(jī)已經(jīng)收到id為{}的消息", id);
} else {
log.error("交換機(jī)未收到id為{}的消息, 原因是:{}", id, s);
// 消息緩存或入庫(kù),郵件提醒運(yùn)維
}
}
// 3.然后在springBoot對(duì)象初始化之后再執(zhí)行rabbitTemplate.setConfirmCallback(this);設(shè)置回調(diào)函數(shù),避免使用默認(rèn)的ConfirmCallback
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
}
生產(chǎn)者類(lèi)
package springbootrabbitmq.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import springbootrabbitmq.config.ConfirmConfig;
import springbootrabbitmq.config.TtlQueueConfig;
import java.util.Date;
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ConfirmController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public String sendMsg(@PathVariable String message) {
log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條消息:{} 到隊(duì)列", new Date().toString(), message);
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message);
return "success";
}
@GetMapping("/sendMsg2/{message}")
public String sendMsg2(@PathVariable String message) {
log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條消息:{} 到隊(duì)列", new Date().toString(), message);
CorrelationData data = new CorrelationData();
data.setId("1111");
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, data);
return "success";
}
}
消費(fèi)者類(lèi)
package springbootrabbitmq.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import springbootrabbitmq.config.ConfirmConfig;
import java.util.Date;
@Component
@Slf4j
public class ConfirmConsumer {
//監(jiān)聽(tīng)器接收消息
@RabbitListener(queues = ConfirmConfig.QUEUE_NAME)
public void receiveD(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("當(dāng)前時(shí)間:{}, 收到一條消息:{} ", new Date().toString(), msg);
}
}
首先正常發(fā)送,然后再刪除交換機(jī)然后再發(fā)送。測(cè)試結(jié)果如下
2023-01-29 21:07:12.367 INFO 79848 --- [nio-8080-exec-1] s.controller.ConfirmController : 當(dāng)前時(shí)間:Sun Jan 29 21:07:12 CST 2023, 發(fā)送一條消息:12 到隊(duì)列
2023-01-29 21:07:12.399 INFO 79848 --- [nectionFactory1] s.config.RabbitMqCallBack : 交換機(jī)已經(jīng)收到id為1111的消息
2023-01-29 21:07:12.403 INFO 79848 --- [ntContainer#0-1] s.consumer.ConfirmConsumer : 當(dāng)前時(shí)間:Sun Jan 29 21:07:12 CST 2023, 收到一條消息:12
2023-01-29 21:08:01.282 INFO 79848 --- [nio-8080-exec-2] s.controller.ConfirmController : 當(dāng)前時(shí)間:Sun Jan 29 21:08:01 CST 2023, 發(fā)送一條消息:123 到隊(duì)列
2023-01-29 21:08:01.289 ERROR 79848 --- [168.15.200:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)
2023-01-29 21:08:01.290 ERROR 79848 --- [nectionFactory2] s.config.RabbitMqCallBack : 交換機(jī)未收到id為1111的消息, 原因是:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)
消息回退
如果不開(kāi)啟消息回退,默認(rèn)是消息即使無(wú)法發(fā)送到隊(duì)列(如路由鍵錯(cuò)誤等場(chǎng)景),也不會(huì)進(jìn)行提醒,生產(chǎn)者不知道消息能否成功發(fā)送到隊(duì)列。
解決方案
當(dāng)消息無(wú)法到達(dá)隊(duì)列的時(shí)候進(jìn)行提醒
消息回退代碼示例
配置,開(kāi)啟消息不可達(dá)目的地時(shí)的回調(diào)
spring.rabbitmq.publisher-returns=true
配置類(lèi),實(shí)現(xiàn)RabbitTemplate.ReturnCallback接口
package springbootrabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
// 1.先實(shí)現(xiàn)RabbitTemplate.ConfirmCallback接口,從寫(xiě)confirm回調(diào)函數(shù)
@Slf4j
@Component
public class RabbitMqCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
// 2.注入
@Autowired
private RabbitTemplate rabbitTemplate;
/**
*
* @param correlationData 消息
* @param b 發(fā)送成功是true,失敗是false
* @param s 發(fā)送失敗時(shí)的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("交換機(jī)已經(jīng)收到id為{}的消息", id);
} else {
log.error("交換機(jī)未收到id為{}的消息, 原因是:{}", id, s);
// 消息緩存或入庫(kù),郵件提醒運(yùn)維
}
}
// 3.然后在springBoot對(duì)象初始化之后再執(zhí)行rabbitTemplate.setConfirmCallback(this);設(shè)置回調(diào)函數(shù),避免使用默認(rèn)的ConfirmCallback
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
// 當(dāng)消息傳遞過(guò)程中不可達(dá)到目的地時(shí)將消息返回給生產(chǎn)者,只有不可達(dá)到目的地時(shí)才會(huì)調(diào)用這個(gè)方法
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息無(wú)法被寫(xiě)入隊(duì)列:{}, 退回原因:{}, 路由Key: {}", message, replyText, routingKey);
// 郵件發(fā)送,緩存或存到數(shù)據(jù)庫(kù)
}
}
生產(chǎn)者
@GetMapping("/sendMsg3/{message}")
public String sendMsg3(@PathVariable String message) {
log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條消息:{} 到隊(duì)列", new Date().toString(), message);
CorrelationData data = new CorrelationData();
data.setId("1111");
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, data);
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY+"222", message +"222", data);
return "success";
}
消費(fèi)者與上一個(gè)消費(fèi)者相同文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-757698.html
測(cè)試結(jié)果如下:調(diào)用:http://127.0.0.1:8080/confirm/sendMsg3/123生產(chǎn)者的接口可以看到當(dāng)路由鍵錯(cuò)誤導(dǎo)致交換機(jī)無(wú)法把消息投遞到隊(duì)列時(shí)會(huì)回調(diào)returnedMessage方法。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-757698.html
2023-01-29 21:27:48.910 INFO 74512 --- [nio-8080-exec-1] s.controller.ConfirmController : 當(dāng)前時(shí)間:Sun Jan 29 21:27:48 CST 2023, 發(fā)送一條消息:123 到隊(duì)列
2023-01-29 21:27:48.934 INFO 74512 --- [nectionFactory1] s.config.RabbitMqCallBack : 交換機(jī)已經(jīng)收到id為1111的消息
2023-01-29 21:27:48.941 ERROR 74512 --- [nectionFactory1] s.config.RabbitMqCallBack : 消息無(wú)法被寫(xiě)入隊(duì)列:(Body:'123222' MessageProperties [headers={spring_returned_message_correlation=1111}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), 退回原因:NO_ROUTE, 路由Key: key1222
2023-01-29 21:27:48.943 INFO 74512 --- [nectionFactory2] s.config.RabbitMqCallBack : 交換機(jī)已經(jīng)收到id為1111的消息
2023-01-29 21:27:48.946 INFO 74512 --- [ntContainer#0-1] s.consumer.ConfirmConsumer : 當(dāng)前時(shí)間:Sun Jan 29 21:27:48 CST 2023, 收到一條消息:123
到了這里,關(guān)于RabbitMq生產(chǎn)者發(fā)送消息確認(rèn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!