一、消費(fèi)端消息可靠性保證:
-
消息確認(rèn)(Acknowledgements):
消費(fèi)者在接收到消息后,默認(rèn)情況下RabbitMQ會自動確認(rèn)消息(autoAck=true)。為保證消息可靠性,可以設(shè)置autoAck=false,使得消費(fèi)者在處理完消息后手動發(fā)送確認(rèn)(basicAck)。如果消費(fèi)者在處理過程中發(fā)生異常或者未完成處理就終止運(yùn)行,那么消息在超時時間內(nèi)將不會被刪除,會再次被RabbitMQ投遞給其他消費(fèi)者。
????? 2.死信隊列(Dead Letter Queue):
當(dāng)消息不能被正常消費(fèi)時(比如達(dá)到最大重試次數(shù)),可以通過設(shè)置TTL(Time To Live)或者死信交換器(Dead Letter Exchange)將消息路由至死信隊列,從而有機(jī)會后續(xù)分析和處理這些無法正常消費(fèi)的消息。
二、生產(chǎn)端消息可靠性保證:
-
消息持久化:
當(dāng)生產(chǎn)者發(fā)布消息時,可以選擇將其標(biāo)記為持久化(persistent).這意味著即使 RabbitMQ 服務(wù)器重啟,消息也不會丟失,因為它們會被存儲在磁盤上。
?????? 2.確認(rèn)(Confirm)機(jī)制:
開啟confirm回調(diào)模式后,RabbitMQ會在消息成功寫入到磁盤并至少被一個交換器接受后,向生產(chǎn)者發(fā)送一個確認(rèn)(acknowledgement)。若消息丟失或無法投遞給任何隊列,RabbitMQ將會發(fā)送一個否定確認(rèn)(nack). 生產(chǎn)者可以根據(jù)這些確認(rèn)信號判斷消息是否成功送達(dá)并采取相應(yīng)的重試策略。
RabbitMQ作為消息中間件并啟用publisher confirms(發(fā)布者確認(rèn))與publisher returns(發(fā)布者退回)機(jī)制時,可以確保消息從生產(chǎn)者到交換機(jī)的投遞過程得到更準(zhǔn)確的狀態(tài)反饋。
1.@PostConstruct注解
@PostConstruct注解是Java EE規(guī)范中的一部分,主要用于標(biāo)記在一個Bean初始化完成后需要執(zhí)行的方法。這個注解由JSR-250定義,并且在Spring框架以及其他遵循Java EE標(biāo)準(zhǔn)的應(yīng)用服務(wù)器中廣泛支持。
功能與用途:初始化方法,當(dāng)容器完成對Bean的實例化并且所有依賴注入完成后,將會自動調(diào)用標(biāo)有@PostConstruct
注解的方法。這為開發(fā)者提供了一個機(jī)會,在對象正式投入使用之前進(jìn)行一些必要的初始化工作,比如初始化資源、預(yù)計算某些值、啟動后臺任務(wù)等增強(qiáng)。
2. Publisher Confirms(發(fā)布者確認(rèn))
作用: Publisher Confirm機(jī)制允許RabbitMQ服務(wù)器通知生產(chǎn)者一個消息是否已經(jīng)被交換機(jī)正確接收。當(dāng)publisher-confirm-type設(shè)置為CORRELATED時,RabbitMQ會向生產(chǎn)者發(fā)送確認(rèn)或否定響應(yīng),確認(rèn)消息已到達(dá)交換機(jī),但不保證消息已被路由到至少一個隊列中。
生產(chǎn)者到交換機(jī)的確認(rèn)(消息到達(dá)交換機(jī))
2.1.配置:
spring.rabbitmq.publisher-confirm-type = CORRELATED
2.2. 代碼實現(xiàn)
只要到達(dá)交換機(jī)就會觸發(fā)
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
// 消息成功投遞成功并被確認(rèn)
} else {
// 消息未能正確投遞
}
}
});
3.Publisher Returns(發(fā)布者退回)
作用: Publisher Return機(jī)制用于當(dāng)消息無法按照路由鍵規(guī)則路由到任何隊列時,或者由于其他原因(例如隊列滿、消息過大等)而被交換機(jī)拒絕時,RabbitMQ將消息返回給生產(chǎn)者。
交換機(jī)到隊列的確認(rèn)(消息是否正常發(fā)送到了隊列)
通過實現(xiàn) ReturnCallback 接口,發(fā)送消息失敗返回,比如交換機(jī)路由不到隊列時觸發(fā)回調(diào):
1.只有消息沒有路由到隊列的時候,才觸發(fā)該回調(diào) .文章來源:http://www.zghlxwxcb.cn/news/detail-851017.html
2.只要有一個隊列接受到消息了,它就認(rèn)為成功.文章來源地址http://www.zghlxwxcb.cn/news/detail-851017.html
3.1 配置
spring.rabbitmq.publisher-returns = true
3.2 代碼實現(xiàn)
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 處理未被路由或因某種原因被退回的消息
}
});
4.完整代碼
4.1消費(fèi)者
/*
* Copyright (c) 2020, 2024, All rights reserved.
*
*/
package com.by.consumer;
import cn.hutool.core.map.MapUtil;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* <p>Project: spring-boot-rabbitMQ - DirectConsumer</p>
* <p>Powered by scl On 2024-04-07 16:57:20</p>
* <p>描述:<p>
*
* @author 孫臣龍 [1846080280@qq.com]
* @version 1.0
* @since 17
*/
@Configuration
public class ReliabilityConsumer2 {
//注冊隊列
@Bean
public Queue queue1() {
return QueueBuilder.durable("Re_Q01").deadLetterExchange("dead_E01").deadLetterRoutingKey("DK01").build();
}
//注冊交換機(jī)
@Bean
public CustomExchange exchange() {
Map<String, Object> map = MapUtil.of("x-delayed-type", "direct");
return new CustomExchange("Re_E01", "x-delayed-message", true, false, map);
}
//綁定交換機(jī)和隊列
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue1()).to(exchange()).with("RK01").noargs();
}
//注冊一個死信交換機(jī)
@Bean
public DirectExchange deadExchange() {
return new DirectExchange("dead_E01");
}
//注冊一個死信隊列
@Bean
public Queue deadQueue() {
return QueueBuilder.durable("dead_Q01").build();
}
//綁定死信交換機(jī)和死信隊列
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("DK01");
}
//啟動一個消費(fèi)者
@RabbitListener(queues = "Re_Q01")
public void receiveMessage(OrderKO msg) {
System.out.println("消費(fèi)者2:" + msg);
}
}
?4.2生產(chǎn)者
/*
* Copyright (c) 2020, 2024, All rights reserved.
*
*/
package com.by.provider;
import com.by.consumer.OrderKO;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.UUID;
/**
* <p>Project: spring-boot-rabbitMQ - DirectProvider</p>
* <p>Powered by scl On 2024-04-07 17:06:41</p>
* <p>描述:<p>
*
* @author 孫臣龍 [1846080280@qq.com]
* @version 1.0
* @since 17
*/
@Service
public class ReliabilityProvider implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
//啟動一個生產(chǎn)者
public void send(OrderKO orderKO) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
System.out.println("callbackSender UUID: " + correlationData.getId());
rabbitTemplate.convertAndSend(
"Re_E01",
"RK01",orderKO,
m-> m,correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b) {
System.out.println("消息發(fā)送成功");
} else {
System.out.println("消息發(fā)送失敗");
}
}
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("消息丟失");
}
}
4.3配置文件
spring.rabbitmq.publisher-confirm-type = CORRELATED
spring.rabbitmq.publisher-returns = true
4.4測試
@Test
void test6() throws InterruptedException, IOException {
for (int i = 1; i <= 5; i++) {
OrderKO orderKO = OrderKO.builder().id(i).name("孫臣龍" + i).build();
System.out.println("發(fā)送消息"+i);
reliabilityProvider.send(orderKO);
}
Thread.sleep(10000);
//System.in.read();
}
到了這里,關(guān)于Rabbitmq怎么保證消息的可靠性?的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!