一、發(fā)送端消息確認(rèn)
(1)publish ===> broker
只要broker收到消息,就會執(zhí)行 confirmCallback
# 開啟發(fā)送端消息抵達Broker確認(rèn)
spring.rabbitmq.publisher-confirms=true
(2)exchange ===> queue
如果exchange有消息沒有成功發(fā)送至queue,就會執(zhí)行RuturnCallback,例:routing key錯誤導(dǎo)致發(fā)送消息到隊列失敗
# 開啟發(fā)送端消息抵達Queue確認(rèn)
spring.rabbitmq.publisher-returns=true
# 只要消息抵達Queue,就會異步發(fā)送優(yōu)先回調(diào)returnfirm
spring.rabbitmq.template.mandatory=true
(3)RabbitmqConfig
/**
* 定制RabbitTemplate
* 1、服務(wù)收到消息就會回調(diào)
* 1、spring.rabbitmq.publisher-confirms: true
* 2、設(shè)置確認(rèn)回調(diào)
* 2、消息正確抵達隊列就會進行回調(diào)
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、設(shè)置確認(rèn)回調(diào)ReturnCallback
*
* 3、消費端確認(rèn)(保證每個消息都被正確消費,此時才可以broker刪除這個消息)
*
*/
// @PostConstruct //MyRabbitConfig對象創(chuàng)建完成以后,執(zhí)行這個方法
public void initRabbitTemplate() {
/**
* 1、只要消息抵達Broker就ack=true
* correlationData:當(dāng)前消息的唯一關(guān)聯(lián)數(shù)據(jù)(這個是消息的唯一id)
* ack:消息是否成功收到
* cause:失敗的原因
*/
//設(shè)置確認(rèn)回調(diào)
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
/**
* 只要消息沒有投遞給指定的隊列,就觸發(fā)這個失敗回調(diào)
* message:投遞失敗的消息詳細(xì)信息
* replyCode:回復(fù)的狀態(tài)碼
* replyText:回復(fù)的文本內(nèi)容
* exchange:當(dāng)時這個消息發(fā)給哪個交換機
* routingKey:當(dāng)時這個消息用哪個路郵鍵
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
二、消費端消息確認(rèn)
(1) queue ===> consumer
默認(rèn)是ack,consumer只要拿到消息就會自動確認(rèn),服務(wù)端就會刪除queue中的消息,如果業(yè)務(wù)出現(xiàn)問題只有部分消息簽收成功,剩余未簽收的消息也會刪除,為了能保存消息,需要設(shè)置為客戶端手動確認(rèn)簽收
#手動簽收ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
(2) 簽收消息channel.basicAck(自增id,是否批量)
設(shè)置手動ack簽收之后,如果有消息沒有簽收(簽收失?。?,會顯示未簽收
自增id:2,簽收完成2
自增id:4,簽收完成4
如果服務(wù)重啟,未簽收的消息就會加入queue重新發(fā)送:Unacked ===> Ready
(3)拒簽 channel.basicNack(自增id,是否批量,是否重新加入隊列)
requeue=true 拒簽的消息會加入queue重新發(fā)送
requeue=false 拒簽的消息會直接丟棄,不會加入隊列重新發(fā)送
@RabbitListener(queues = {"durunwu.queue"})
@Service("orderService")
public class OrderServiceImpl extends ServiceImpl<OrderDao, OrderEntity> implements OrderService {
@RabbitHandler
public void msgListener2(Message message, OrderItemEntity orderItemEntity, Channel channel) throws IOException {
//順序id,channel內(nèi)按順序自增了
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (deliveryTag % 2 == 0){
//basicAck(long deliveryTag, boolean multiple)
//channel內(nèi)按順序自增
channel.basicAck(deliveryTag,false);
System.out.println("自增id:"+deliveryTag+",簽收完成" + orderItemEntity.getId());
}else {
//basicNack(long deliveryTag, boolean multiple, boolean requeue)
channel.basicNack(deliveryTag,false,true);
System.out.println("自增id:"+deliveryTag+",拒簽"+orderItemEntity.getId());
}
}
}
控制臺打印文章來源:http://www.zghlxwxcb.cn/news/detail-637645.html
自增id:1,拒簽1
自增id:2,簽收完成2
自增id:3,拒簽3
自增id:4,簽收完成4
自增id:5,拒簽5
自增id:6,簽收完成1
自增id:7,拒簽3
自增id:8,簽收完成5
自增id:9,拒簽3
自增id:10,簽收完成3
(4)整合業(yè)務(wù)
如何簽收?
業(yè)務(wù)完成 ===> 簽收:basicAck(deliveryTag, false)
業(yè)務(wù)未完成 ===> 拒簽:basicNack(deliveryTag, false, true)文章來源地址http://www.zghlxwxcb.cn/news/detail-637645.html
到了這里,關(guān)于rabbitmq消息確認(rèn)機制的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!