系列文章目錄
提示:這里可以添加系列文章的所有文章的目錄,目錄需要自己手動添加
RabbitMQ之消息的可靠性傳遞
提示:寫完文章后,目錄可以自動生成,如何生成可參考右邊的幫助文檔
前言
提示:這里可以添加本文要記錄的大概內(nèi)容:
在當今的信息化時代,消息傳遞在企業(yè)級應(yīng)用和分布式系統(tǒng)中扮演著至關(guān)重要的角色。而 RabbitMQ 作為一款強大的消息隊列中間件,以其可靠性和高性能成為了眾多開發(fā)者的首選。本文將深入探討 RabbitMQ 中消息的可靠性傳遞機制,以及如何在實際應(yīng)用中確保消息的不丟失。
通過閱讀本文,您將了解到 RabbitMQ 可靠消息傳遞的核心概念和工作原理。我們將探討消息確認、持久性、隊列和交換機的配置以及錯誤處理等關(guān)鍵主題,以幫助您構(gòu)建高度可靠的消息傳遞系統(tǒng)。
無論您是剛剛開始接觸 RabbitMQ,還是已經(jīng)在實際項目中使用它,本文都將為您提供有價值的見解和實用的指導。讓我們一起深入了解 RabbitMQ 的可靠性特性,掌握構(gòu)建可靠系統(tǒng)的關(guān)鍵技能。
提示:以下是本篇文章正文內(nèi)容,下面案例可供參考
一、消息的可靠性傳遞的概念
在RabbitMQ中,消息投遞的路徑為:生產(chǎn)者->交換機->隊列->消費者。而在消息的投遞過程中,每一個環(huán)節(jié)都可能投遞失敗,那么RabbitMQ是通過什么方法確認消息投遞成功的呢?
- 確認模式(confirm)可以監(jiān)聽消息是否從生產(chǎn)者成功傳遞到交換機。
- 退回模式(return)可以監(jiān)聽消息是否從交換機成功傳遞到隊列。
- 消費者消息確認(Consumer Ack)可以監(jiān)聽消費者是否成功處理消息。
二、三種模式的實現(xiàn)
環(huán)境準備
1.首先我們準備兩個SpringBoot項目,分別代表生產(chǎn)者和消費者,配置文件如下:
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: zhangsan
password: zhangsan
virtual-host: /
#日志格式
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
2.在生產(chǎn)者的配置類創(chuàng)建交換機和隊列
@Configuration
public class RabbitConfig {
private final String EXCHANGE_NAME="my_topic_exchange";
private final String QUEUE_NAME="my_queue";
// 1.創(chuàng)建交換機
@Bean("bootExchange")
public Exchange getExchange(){
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME) // 交換機類型
.durable(true) // 是否持久化
.build();
}
// 2.創(chuàng)建隊列
@Bean("bootQueue")
public Queue getMessageQueue(){
return QueueBuilder
.durable(QUEUE_NAME) // 隊列持久化
.build();
}
// 3.將隊列綁定到交換機
@Bean
public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
}
}
確認模式
1.生產(chǎn)者配置文件開啟確認模式
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: zhangsan
password: zhangsan
virtual-host: /
# 開啟確認模式
publisher-confirm-type: correlated
2.生產(chǎn)者定義確認模式的回調(diào)方法
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testConfirm(){
// 定義確認模式的回調(diào)方法,消息向交換機發(fā)送后會調(diào)用confirm方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 被調(diào)用的回調(diào)方法
* @param correlationData 相關(guān)配置信息
* @param ack 交換機是否成功收到了消息
* @param cause 失敗原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.println("confirm接受成功!");
}else{
System.out.println("confirm接受失敗,原因為:"+cause);
// 做一些處理。
}
}
});
rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","send message...");
}
}
退回模式
1.生產(chǎn)者配置文件開啟退回模式
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: zhangsan
password: zhangsan
virtual-host: /
# 開啟確認模式
publisher-confirm-type: correlated
# 開啟回退模式
publisher-returns: true
2.生產(chǎn)者定義退回模式的回調(diào)方法
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testReturn(){
// 定義退回模式的回調(diào)方法。交換機發(fā)送到隊列失敗后才會執(zhí)行returnedMessage方法
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
/**
* @param returned 失敗后將失敗信息封裝到參數(shù)中
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("消息對象:"+returned.getMessage());
System.out.println("錯誤碼:"+returned.getReplyCode());
System.out.println("錯誤信息:"+returned.getReplyText());
System.out.println("交換機:"+returned.getExchange());
System.out.println("路由鍵:"+returned.getRoutingKey());
// 處理消息...
}
});
rabbitTemplate.convertAndSend("my_topic_exchange","my_routing1","send message...");
}
}
消費者確認
在 RabbitMQ 中,當消費者接收到消息后,會向隊列發(fā)送一個確認消息,表示已經(jīng)成功接收并處理了該消息。只有當確認消息被發(fā)送后,該消息才會從隊列中被移除。這種機制被稱為消費者消息確認(Consumer Acknowledge,簡稱 Ack)。這就類似于快遞員派送快遞時需要我們簽收一樣,否則快遞就會一直存在于快遞公司的系統(tǒng)中。
消息確認分為自動確認和手動確認兩種方式。自動確認意味著只要消費者接收到消息,無論是否成功處理了該消息,都會自動發(fā)送確認消息,并將消息從隊列中移除。然而,在實際開發(fā)過程中,如果在接收到消息后業(yè)務(wù)處理出現(xiàn)異常,那么消息就可能會丟失。因此,需要設(shè)置手動確認,也就是只有在業(yè)務(wù)處理成功后,才會發(fā)送確認消息通知隊列;如果出現(xiàn)異常,則會發(fā)送拒絕消息,讓消息仍然保留在隊列中。
- 自動確認:spring.rabbitmq.listener.simple.acknowledge=“none”
- 手動確認:spring.rabbitmq.listener.simple.acknowledge=“manual”
1.消費者配置開啟手動簽收
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: zhangsan
password: zhangsan
virtual-host: /
# 開啟手動簽收
listener:
simple:
acknowledge-mode: manual
2.消費者處理消息時定義手動簽收和拒絕簽收的情況
@Component
public class AckConsumer {
@RabbitListener(queues = "my_queue")
public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
// 消息投遞序號,消息每次投遞該值都會+1
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
int i = 1/0; //模擬處理消息出現(xiàn)bug
System.out.println("成功接受到消息:"+message);
// 簽收消息
/**
* 參數(shù)1:消息投遞序號
* 參數(shù)2:是否一次可以簽收多條消息
*/
channel.basicAck(deliveryTag,true);
}catch (Exception e){
System.out.println("消息消費失??!");
Thread.sleep(2000);
// 拒簽消息
/**
* 參數(shù)1:消息投遞序號
* 參數(shù)2:是否一次可以拒簽多條消息
* 參數(shù)3:拒簽后消息是否重回隊列
*/
channel.basicNack(deliveryTag,true,true);
}
}
}
總結(jié)
提示:這里對文章進行總結(jié):
文章來源:http://www.zghlxwxcb.cn/news/detail-806402.html
在 RabbitMQ 中,為了確保消息的可靠性傳遞,需要使用確認應(yīng)答和重傳機制。當生產(chǎn)者將消息發(fā)送到 RabbitMQ 服務(wù)器時,服務(wù)器會向生產(chǎn)者返回一個確認應(yīng)答,表示消息已成功接收。如果生產(chǎn)者沒有收到確認應(yīng)答,它可以在一定時間內(nèi)重傳消息,直到收到確認應(yīng)答或達到重試次數(shù)限制。
當消費者從隊列中消費消息時,它會向 RabbitMQ 服務(wù)器發(fā)送一個確認應(yīng)答,表示消息已成功處理。如果消費者在處理消息時出現(xiàn)異?;虮罎?,RabbitMQ 服務(wù)器可以通過重傳機制將消息重新推送給其他消費者進行處理,以確保消息不會丟失。文章來源地址http://www.zghlxwxcb.cn/news/detail-806402.html
到了這里,關(guān)于RabbitMQ之消息的可靠性傳遞的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!