RabbitMQ持久化
如何保障當(dāng) RabbitMQ 服務(wù)停掉以后消 息生產(chǎn)者發(fā)送過來的消息不丟失。默認情況下 RabbitMQ 退出或由于某種原因崩潰時,它忽視隊列 和消息,除非告知它不要這樣做。確保消息不會丟失需要做兩件事:我們需要將隊列和消息都標記為持久化
隊列如何實現(xiàn)持久化
之前我們創(chuàng)建的隊列都是非持久化的,RabbitMQ如果重啟的話,該隊列就會被刪除,如果要實現(xiàn)隊列持久化,需要在聲明隊列的時候把durable參數(shù)設(shè)置持久化
?但是需要注意的就是如果之前聲明的隊列不是持久化的,需要把原先隊列先刪除,或者重新 創(chuàng)建一個持久化的隊列,不然就會出現(xiàn)錯誤
以下是控制臺中持久化與非持久化隊列的UI顯示區(qū),當(dāng)Features列顯示為D代表是持久化隊列
?
消息實現(xiàn)持久化
要想讓消息實現(xiàn)持久化需要在消息生產(chǎn)者修改代碼,MessageProperties.PERSISTENT_TEXT_PLAIN 添加這個屬性
生產(chǎn)者代碼:
/*
* 消息再手動應(yīng)答不丟失、放回消息隊列重新消費
*/
public class Task2 {
//隊列名稱
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
/*
*生成一個隊列
* 1.隊列名稱
* 2.隊列里面的信息是否持久化(磁盤)默認情況時在內(nèi)存
* 3.該隊列是否只供一個消費者進行消費 是否消費共享 true是允許
* 4.是否自動刪除 最后一個消費者斷開連接之后 該隊列是否自動刪除 true自動刪除 false不自動刪除
* 5.其他參數(shù) 延遲消息等
*/
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
//從控制臺中輸入信息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println("生產(chǎn)者發(fā)出消息:"+message);
}
}
}
將消息標記為持久化并不能完全保證不會丟失消息。盡管它告訴 RabbitMQ 將消息保存到磁盤,但是 這里依然存在當(dāng)消息剛準備存儲在磁盤的時候 但是還沒有存儲完,消息還在緩存的一個間隔點。此時并沒有真正寫入磁盤。持久性保證并不強,但是對于我們的簡單任務(wù)隊列而言,這已經(jīng)綽綽有余了
不公平分發(fā)
在最開始的時候我們學(xué)習(xí)到 RabbitMQ 分發(fā)消息采用的輪訓(xùn)分發(fā),但是在某種場景下這種策略并不是 很好,比方說有兩個消費者在處理任務(wù),其中有個消費者 1 處理任務(wù)的速度非??欤硗庖粋€消費者 2 處理速度卻很慢,這個時候我們還是采用輪訓(xùn)分發(fā)的化就會到這處理速度快的這個消費者很大一部分時間 處于空閑狀態(tài),而處理慢的那個消費者一直在干活,這種分配方式在這種情況下其實就不太好,但是 RabbitMQ 并不知道這種情況它依然很公平的進行分發(fā)。為了避免這種情況,我們可以設(shè)置參數(shù) channel.basicQos(1),將想要不公平分發(fā)的消費者設(shè)置該參數(shù)
消費者01代碼如下:
public class Work01 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1等待接收消息短");
//消息消費的時候如何處置消息
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody());
SleepUtils.sleep(1);
System.out.println("接收到消息:"+message);
/**
* 1.消息標記tag
* 2.是否批量應(yīng)答未應(yīng)答的消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//取消消息的回調(diào)
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消費被中斷");
};
/**
* 消費者信息
* 1.消費哪個隊列
* 2.消費成功之后是否要自動應(yīng)答 true自動應(yīng)答 false手動應(yīng)答
* 3.消費者微車才能更改消費的回調(diào)
* 4.消費者取消消費回調(diào)
*/
boolean autoAck = false;
//設(shè)置不公平分發(fā)
channel.basicQos(1);
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
消費者02代碼如下:
public class Work02 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2等待接收消息長");
//消息消費的時候如何處置消息
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody());
SleepUtils.sleep(30);
System.out.println("接收到消息:"+message);
/**
* 1.消息標記tag
* 2.是否批量應(yīng)答未應(yīng)答的消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//取消消息的回調(diào)
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消費被中斷");
};
/**
* 消費者信息
* 1.消費哪個隊列
* 2.消費成功之后是否要自動應(yīng)答 true自動應(yīng)答 false手動應(yīng)答
* 3.消費者微車才能更改消費的回調(diào)
* 4.消費者取消消費回調(diào)
*/
boolean autoAck = false;
//設(shè)置不公平分發(fā)
channel.basicQos(1);
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
?
效果展示:
文章來源:http://www.zghlxwxcb.cn/news/detail-827597.html
預(yù)取值
本身消息的發(fā)送就是異步發(fā)送的,所以在任何時候,channel 上肯定不止只有一個消息另外來自消費 者的手動確認本質(zhì)上也是異步的。因此這里就存在一個未確認的消息緩沖區(qū),因此希望開發(fā)人員能限制此 緩沖區(qū)的大小,以避免緩沖區(qū)里面無限制的未確認消息問題。這個時候就可以通過使用 basic.qos 方法設(shè) 置“預(yù)取計數(shù)”值來完成的。該值定義通道上允許的未確認消息的最大數(shù)量。一旦數(shù)量達到配置的數(shù)量, RabbitMQ 將停止在通道上傳遞更多消息,除非至少有一個未處理的消息被確認,例如,假設(shè)在通道上有 未確認的消息 5、6、7,8,并且通道的預(yù)取計數(shù)設(shè)置為 4,此時 RabbitMQ 將不會在該通道上再傳遞任何 消息,除非至少有一個未應(yīng)答的消息被 ack。比方說 tag=6 這個消息剛剛被確認 ACK,RabbitMQ 將會感知 這個情況到并再發(fā)送一條消息。消息應(yīng)答和 QoS 預(yù)取值對用戶吞吐量有重大影響。通常,增加預(yù)取將提高 向消費者傳遞消息的速度。雖然自動應(yīng)答傳輸消息速率是最佳的,但是,在這種情況下已傳遞但尚未處理 的消息的數(shù)量也會增加,從而增加了消費者的 RAM 消耗(隨機存取存儲器)應(yīng)該小心使用具有無限預(yù)處理 的自動確認模式或手動確認模式,消費者消費了大量的消息如果沒有確認的話,會導(dǎo)致消費者連接節(jié)點的 內(nèi)存消耗變大,所以找到合適的預(yù)取值是一個反復(fù)試驗的過程,不同的負載該值取值也不同 100 到 300 范 圍內(nèi)的值通??商峁┳罴训耐掏铝?,并且不會給消費者帶來太大的風(fēng)險。預(yù)取值為 1 是最保守的。當(dāng)然這 將使吞吐量變得很低,特別是消費者連接延遲很嚴重的情況下,特別是在消費者連接等待時間較長的環(huán)境 中。對于大多數(shù)應(yīng)用來說,稍微高一點的值將是最佳的文章來源地址http://www.zghlxwxcb.cn/news/detail-827597.html
到了這里,關(guān)于【初始RabbitMQ】持久化的實現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!