提示:文章寫完后,目錄可以自動生成,如何生成可參考右邊的幫助文檔
前言
在使用rabbitmq時,會因為各種原因(網(wǎng)絡波動,系統(tǒng)宕機,程序異常等)導致消息發(fā)送失敗。rabbitmq也提供了相應的處理機制。
提示:以下是本篇文章正文內容,下面案例可供參考
一、rabbitmq消息發(fā)送失敗處理機制
生產(chǎn)法發(fā)送失敗
配置回調器。
yml配置開啟確認和返回機制
confirm:發(fā)送給exchange時的回調,不管是否成功發(fā)送給隊列。
return:消息沒有發(fā)送給exchange時的回調。
#成功發(fā)送到exchange時的回調
spring.rabbitmq.publisher-confirm-type=correlated
#exchange未發(fā)送到隊列時回調
spring.rabbitmq.publisher-returns=true
回調函數(shù)配置方式
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
/**
* 交換機不管是否收到消息的一個回調方法
*
* @param correlationData 消息相關數(shù)據(jù)
* @param ack 交換機是否收到消息
* @param cause 未收到消息的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交換機已經(jīng)收到 id 為:{}的消息", id);
} else {
//重發(fā)處理,可以入庫,死信隊列處理....
log.info("交換機還未收到 id 為:{}消息,原因:{}", id, cause);
}
}
//當消息無法路由的時候觸發(fā)回調方法
@Override
public void returnedMessage(ReturnedMessage returned) {
//重發(fā)處理,可以入庫,死信隊列處理....
log.error("消息:{},被交換機 {} 退回,原因:{},路由key:{},code:{}",
new String(returned.getMessage().getBody()), returned.getExchange(),
returned.getReplyText(), returned.getRoutingKey(),
returned.getReplyCode());
}
}
rabbitmqTemplate在啟動時注入:
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitTemplate.ConfirmCallback confirmCallback;
@Autowired
private RabbitTemplate.ReturnsCallback returnsCallback;
//依賴注入 rabbitTemplate 之后再設置它的回調對象
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnsCallback(returnsCallback);
}
消費者消費失敗后處理
通過自動ack+retry配置+私信隊列方式實現(xiàn)
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
simple:
# acknowledge-mode: manual # 配置該消費者的ack方式為手動
acknowledge-mode: auto # 配置該消費者的ack方式為自動
default-requeue-rejected: false
#設置消費失敗后重發(fā)
retry:
#重發(fā)次數(shù)
max-attempts: 3
#開啟重發(fā)
enabled: true
# 重試間隔(ms)
initial-interval: 5000
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME1, durable = “true”, autoDelete = “false”,
arguments = {@Argument(name = “x-dead-letter-exchange”, value = “dead-exchange”),
@Argument(name = “x-dead-letter-routing-key”, value = “dead-routing-key”),
@Argument(name = “x-message-ttl”, value = “1000”,type = “java.lang.Long”)
}),
exchange = @Exchange(value = “first_exchange”, type = ExchangeTypes.DIRECT),
key = “queue_one_key1”))
public void handleMessage1(Message message, Channel channel) throws IOException {
log.info(“OrderConsumer handleMessage {} , error:”, message);
//模擬消費異常,自動進入私信隊列
throw new RuntimeException(“拋出異常,模擬消費失敗,觸發(fā)spring-retry”);
}
/**
* 死信隊列消費者
*
* @param data
* @param channel
* @throws Exception
*/
@RabbitListener(queues = "dead-queue")
public void consumeDL(String data, Channel channel) throws Exception {
//處理消費失敗的消息
log.info(">>>> 死信隊列消費 tag = {},消息內容 : {}", data);
// channel.basicNack(tag, false, false);
}
通過手動ack+私信隊列實現(xiàn)(不要配置retry!!!)文章來源:http://www.zghlxwxcb.cn/news/detail-406881.html
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(String data, Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("接受到隊列 confirm.queue 消息:{}", msg);
// throw new RuntimeException("拋出異常,模擬消費失敗,觸發(fā)spring-retry");
//模擬消費失敗,重發(fā)n次后仍然失敗。調用basicNack 拋給私信隊列處理
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
//接收消息
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("當前時間:{},收到死信隊列信息{}", new Date().toString(), msg);
}
總結
rabbitmq消息失敗處理需要謹慎對待,因為容易產(chǎn)生資源消耗殆盡的問題?。?!文章來源地址http://www.zghlxwxcb.cn/news/detail-406881.html
到了這里,關于rabbitmq消息異常處理的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!