概念:在上一章文章中我們演示了消費(fèi)者宕機(jī)的情況下消息沒(méi)有被消費(fèi)成功后會(huì)重新入隊(duì),然后再被消費(fèi),但如何保障RabbitMQ服務(wù)停掉的情況下,生產(chǎn)者發(fā)過(guò)來(lái)的消息不會(huì)丟失,這時(shí)候我們?yōu)榱讼⒉粫?huì)丟失就需要將隊(duì)列和消息都標(biāo)記為持久化。
1、實(shí)現(xiàn)RabbitMQ隊(duì)列持久化
只需要把queueDeclare方法的第二個(gè)參數(shù)改為true即可對(duì)Queue進(jìn)行持久化
package com.ken;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 生產(chǎn)者
*/
public class Producer {
//隊(duì)列名稱(用于指定往哪個(gè)隊(duì)列發(fā)送消息)
public static final String QUEUE_NAME = "durable_queue";
//進(jìn)行發(fā)送操作
public static void main(String[] args) throws Exception{
//創(chuàng)建一個(gè)連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置工廠IP,用于連接RabbitMQ的隊(duì)列
factory.setHost("192.168.194.150");
//設(shè)置連接RabbitMQ的用戶名
factory.setUsername("admin");
//設(shè)置連接RabbitMQ的密碼
factory.setPassword("123456");
//創(chuàng)建連接
Connection connection = factory.newConnection();
//獲取信道
Channel channel = connection.createChannel();
/**
* 創(chuàng)建隊(duì)列(持久化隊(duì)列)
* 第一個(gè)參數(shù):隊(duì)列名稱
* 第二個(gè)參數(shù):服務(wù)器重啟后隊(duì)列是否還存在,即隊(duì)列是否持久化,true為是,false為否,默認(rèn)false,即消息存儲(chǔ)在內(nèi)存中而不是硬盤中
* 第三個(gè)參數(shù):該隊(duì)列是否只供一個(gè)消費(fèi)者進(jìn)行消費(fèi),是否進(jìn)行消息共享,true為只允許一個(gè)消費(fèi)者進(jìn)行消費(fèi),false為允許多個(gè)消費(fèi)者對(duì)隊(duì)列進(jìn)行消費(fèi),默認(rèn)false
* 第四個(gè)參數(shù):是否自動(dòng)刪除,最后一個(gè)消費(fèi)者斷開(kāi)連接后該隊(duì)列是否自動(dòng)刪除,true自動(dòng)刪除,false不自動(dòng)刪除
* 第五個(gè)參數(shù):其他參數(shù)
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//發(fā)消息
String message = "Hello World";
/**
* 用信道對(duì)消息進(jìn)行發(fā)布
* 第一個(gè)參數(shù):發(fā)送到哪個(gè)交換機(jī)
* 第二個(gè)參數(shù):路由的Key值是哪個(gè),本次是隊(duì)列名
* 第三個(gè)參數(shù):其他參數(shù)信息
* 第四個(gè)參數(shù):發(fā)送消息的消息體
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息發(fā)送成功!");
}
}
效果圖:
只要Features這個(gè)屬性的值為D,則證明隊(duì)列持久化成功

2、實(shí)現(xiàn)RabbitMQ消息持久化
只需要往basicPublish方法的第三個(gè)參數(shù)傳MessageProperties.PERSISTENT_TEXT_PLAIN,即可對(duì)消息進(jìn)行持久化這個(gè)參數(shù)能告訴RabbitMQ將消息保存到磁盤里進(jìn)行持久化處理,但值得注意的是將消息標(biāo)記為持久化不能完全保證消息不會(huì)丟失,因?yàn)榇嬖谙倻?zhǔn)備存儲(chǔ)到磁盤里,但未完全存儲(chǔ)完的時(shí)間間隔,這時(shí)候如果宕機(jī)了就不能保證消息真正的寫入磁盤重從而實(shí)現(xiàn)持久化,但對(duì)于簡(jiǎn)單任務(wù)隊(duì)列而言,這種持久化策略已經(jīng)夠用了
package com.ken;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* 生產(chǎn)者
*/
public class Producer {
//隊(duì)列名稱(用于指定往哪個(gè)隊(duì)列發(fā)送消息)
public static final String QUEUE_NAME = "durable_queue";
//進(jìn)行發(fā)送操作
public static void main(String[] args) throws Exception{
//創(chuàng)建一個(gè)連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置工廠IP,用于連接RabbitMQ的隊(duì)列
factory.setHost("192.168.194.150");
//設(shè)置連接RabbitMQ的用戶名
factory.setUsername("admin");
//設(shè)置連接RabbitMQ的密碼
factory.setPassword("123456");
//創(chuàng)建連接
Connection connection = factory.newConnection();
//獲取信道
Channel channel = connection.createChannel();
/**
* 創(chuàng)建隊(duì)列
* 第一個(gè)參數(shù):隊(duì)列名稱
* 第二個(gè)參數(shù):服務(wù)器重啟后隊(duì)列是否還存在,即隊(duì)列是否持久化,true為是,false為否,默認(rèn)false,即消息存儲(chǔ)在內(nèi)存中而不是硬盤中
* 第三個(gè)參數(shù):該隊(duì)列是否只供一個(gè)消費(fèi)者進(jìn)行消費(fèi),是否進(jìn)行消息共享,true為只允許一個(gè)消費(fèi)者進(jìn)行消費(fèi),false為允許多個(gè)消費(fèi)者對(duì)隊(duì)列進(jìn)行消費(fèi),默認(rèn)false
* 第四個(gè)參數(shù):是否自動(dòng)刪除,最后一個(gè)消費(fèi)者斷開(kāi)連接后該隊(duì)列是否自動(dòng)刪除,true自動(dòng)刪除,false不自動(dòng)刪除
* 第五個(gè)參數(shù):其他參數(shù)
*/
channel.queueDeclare(QUEUE_NAME,false ,false,false,null);
//發(fā)消息
String message = "Hello World";
/**
* 用信道對(duì)消息進(jìn)行發(fā)布(消息持久化,把消息保存到磁盤里,不設(shè)置則保存到內(nèi)存里,容易丟失)
* 第一個(gè)參數(shù):發(fā)送到哪個(gè)交換機(jī)
* 第二個(gè)參數(shù):路由的Key值是哪個(gè),本次是隊(duì)列名
* 第三個(gè)參數(shù):其他參數(shù)信息
* 第四個(gè)參數(shù):發(fā)送消息的消息體
*/
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("消息發(fā)送成功!");
}
}
其他:
1、出現(xiàn)報(bào)錯(cuò)信息:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'durable_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)

原因:
當(dāng)前隊(duì)列是未持久化的,需要?jiǎng)h除隊(duì)列然后改成持久化才能重新生效
刪除隊(duì)列:
(1)點(diǎn)擊要?jiǎng)h除的隊(duì)列

(2)找到Delete Queue的按鈕文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-707533.html

(3)點(diǎn)擊確認(rèn)刪除文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-707533.html

到了這里,關(guān)于RabbitMQ系列(8)--實(shí)現(xiàn)RabbitMQ隊(duì)列持久化及消息持久化的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!