面試題:Rebbitmq怎么保證消息的可靠性
1,消費(fèi)的可靠性保證:
-
消息確認(rèn)(Acknowledgements):
消費(fèi)者在接收到消息后,默認(rèn)情況下RabbitMQ會自動(dòng)確認(rèn)消息(autoAck=true)。為保證消息可靠性,可以設(shè)置autoAck=false,使得消費(fèi)者在處理完消息后手動(dòng)發(fā)送確認(rèn)(basicAck)。如果消費(fèi)者在處理過程中發(fā)生異?;蛘呶赐瓿商幚砭徒K止運(yùn)行,那么消息在超時(shí)時(shí)間內(nèi)將不會被刪除,會再次被RabbitMQ投遞給其他消費(fèi)者。
2.死信隊(duì)列(Dead Letter Queue):
當(dāng)消息不能被正常消費(fèi)時(shí)(比如達(dá)到最大重試次數(shù)),可以通過設(shè)置TTL(Time To Live)或者死信交換器(Dead Letter Exchange)將消息路由至死信隊(duì)列,從而有機(jī)會后續(xù)分析和處理這些無法正常消費(fèi)的消息。
2,生產(chǎn)端消息可靠性保證:
-
消息持久化:
當(dāng)生產(chǎn)者發(fā)布消息時(shí),可以選擇將其標(biāo)記為持久化(persistent).這意味著即使 RabbitMQ 服務(wù)器重啟,消息也不會丟失,因?yàn)樗鼈儠淮鎯υ诖疟P上。
-
確認(rèn)(Confirm)機(jī)制:
開啟confirm回調(diào)模式后,RabbitMQ會在消息成功寫入到磁盤并至少被一個(gè)交換器接受后,向生產(chǎn)者發(fā)送一個(gè)確認(rèn)(acknowledgement)。若消息丟失或無法投遞給任何隊(duì)列,RabbitMQ將會發(fā)送一個(gè)否定確認(rèn)(nack). 生產(chǎn)者可以根據(jù)這些確認(rèn)信號判斷消息是否成功送達(dá)并采取相應(yīng)的重試策略。
RabbitMQ作為消息中間件并啟用publisher confirms(發(fā)布者確認(rèn))與publisher returns(發(fā)布者退回)機(jī)制時(shí),可以確保消息從生產(chǎn)者到交換機(jī)的投遞過程得到更準(zhǔn)確的狀態(tài)反饋。
1.@PostConstruct注解
@PostConstruct注解是Java EE規(guī)范中的一部分,主要用于標(biāo)記在一個(gè)Bean初始化完成后需要執(zhí)行的方法。這個(gè)注解由JSR-250定義,并且在Spring框架以及其他遵循Java EE標(biāo)準(zhǔn)的應(yīng)用服務(wù)器中廣泛支持。
功能與用途:初始化方法,當(dāng)容器完成對Bean的實(shí)例化并且所有依賴注入完成后,將會自動(dòng)調(diào)用標(biāo)有@PostConstruct
注解的方法。這為開發(fā)者提供了一個(gè)機(jī)會,在對象正式投入使用之前進(jìn)行一些必要的初始化工作,比如初始化資源、預(yù)計(jì)算某些值、啟動(dòng)后臺任務(wù)等增強(qiáng)。
2. Publisher Confirms(發(fā)布者確認(rèn))
作用: Publisher Confirm機(jī)制允許RabbitMQ服務(wù)器通知生產(chǎn)者一個(gè)消息是否已經(jīng)被交換機(jī)正確接收。當(dāng)publisher-confirm-type設(shè)置為CORRELATED時(shí),RabbitMQ會向生產(chǎn)者發(fā)送確認(rèn)或否定響應(yīng),確認(rèn)消息已到達(dá)交換機(jī),但不保證消息已被路由到至少一個(gè)隊(duì)列中。
生產(chǎn)者到交換機(jī)的確認(rèn)(消息到達(dá)交換機(jī))
配置
spring.rabbitmq.publisher-confirm-type = CORRELATED
代碼實(shí)現(xiàn)
只要到達(dá)交換機(jī)就會觸發(fā)
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { // 消息成功投遞成功并被確認(rèn) } else { // 消息未能正確投遞 } } });
3.Publisher Returns(發(fā)布者退回)
作用: Publisher Return機(jī)制用于當(dāng)消息無法按照路由鍵規(guī)則路由到任何隊(duì)列時(shí),或者由于其他原因(例如隊(duì)列滿、消息過大等)而被交換機(jī)拒絕時(shí),RabbitMQ將消息返回給生產(chǎn)者。
交換機(jī)到隊(duì)列的確認(rèn)(消息是否正常發(fā)送到了隊(duì)列)
通過實(shí)現(xiàn) ReturnCallback 接口,發(fā)送消息失敗返回,比如交換機(jī)路由不到隊(duì)列時(shí)觸發(fā)回調(diào):
1.只有消息沒有路由到隊(duì)列的時(shí)候,才觸發(fā)該回調(diào) .
2.只要有一個(gè)隊(duì)列接受到消息了,它就認(rèn)為成功.
配置
spring.rabbitmq.publisher-returns = true
代碼實(shí)現(xiàn)文章來源:http://www.zghlxwxcb.cn/news/detail-850507.html
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 處理未被路由或因某種原因被退回的消息
}
});
完整代碼文章來源地址http://www.zghlxwxcb.cn/news/detail-850507.html
@Slf4j
@Service
public class DirectProvider implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
//注解是專門數(shù)據(jù)初始化的注解, 只有其他組件注入后,初始化方法才會執(zhí)行,
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
public void send(OrderingOk orderingOk){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
System.out.println("callbackSender UUID: " + correlationData.getId());
rabbitTemplate.convertAndSend(
"Direct_E01",
"test",orderingOk,
m-> m,correlationData);
}
/**
* 確認(rèn)消息是否被交換機(jī)接收。
*
* @param correlationData 包含消息相關(guān)數(shù)據(jù)的對象,用于識別消息的唯一性。
* @param ack 表示消息是否被交換機(jī)確認(rèn)接收。
* @param cause 如果消息未被接收,提供未接收的原因。
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 獲取相關(guān)數(shù)據(jù)的ID,如果相關(guān)數(shù)據(jù)為空,則設(shè)置為空字符串
String id = correlationData != null ? correlationData.getId() : "";
// 如果消息被確認(rèn)接收,則記錄日志
if (ack) {
log.info("交換機(jī)已經(jīng)收到了ID為:{}的消息", id);
} else {
// 如果消息未被確認(rèn)接收,則記錄包括未接收原因的日志
log.info("交換機(jī)還未收到ID為:{}的消息,由于原因:{}", id, cause);
}
}
/**
* 記錄被交換機(jī)退回的消息信息。
*
* @param message 消息對象,包含消息體。
* @param replyCode 返回的響應(yīng)代碼,用于指示退回的原因。
* @param replyText 返回的響應(yīng)文本,提供關(guān)于退回的詳細(xì)信息。
* @param exchange 退回時(shí)涉及的交換機(jī)名稱。
* @param routingKey 退回時(shí)使用的路由鍵。
*/
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 記錄消息退回的日志信息
log.info("消息{},被交換機(jī):{}退回,原因是:{},路由key是:{}", new String(message.getBody()), exchange, replyCode, replyText, routingKey);
}
到了這里,關(guān)于如何保證消息的可靠性(面試題)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!