目錄
-
一、RabbitMQ 持久化機制
- 1、RabbitMQ 持久化概述
- 2、隊列持久化
- 3、消息持久化
- 4、交換器持久化
-
二、RabbitMQ 知識擴展
- 1、內存告警與內存換頁
- 2、磁盤告警與配置
- 3、數(shù)據(jù)寫入磁盤時機
- 4、磁盤消息格式
- 5、磁盤文件刪除機制
一、RabbitMQ 持久化機制
1、RabbitMQ 持久化概述
持久化,即將原本存在于內存中的數(shù)據(jù)寫入到磁盤上永久保存數(shù)據(jù),防止服務宕機時內存數(shù)據(jù)的丟失。
Rabbitmq 的持久化分為隊列持久化、消息持久化和交換器持久化。
對于消息來說,不管是持久化的消息還是非持久化的消息都可以被寫入到磁盤。持久化的消息會同時寫入磁盤和內存(加快讀取速度),非持久化消息會在內存不夠用時,將消息寫入磁盤(一般重啟之后就沒有了)。
2、隊列持久化
隊列的持久化是在定義隊列時的通過 durable
參數(shù)來決定的,當 durable 為 true 時,才代表隊列會持久化。例如:
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//第二個餐胡設置為true,代表隊列持久化
channel.queueDeclare("queue.persistent.name", true, false, false, null);
關鍵的是第二個參數(shù)設置為 true,即 durable = true。Channel 類中 queueDeclare 的完整定義如下:
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
參數(shù)說明:
- queue:queue 的名稱
-
exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次申明它的連接可見,并在連接斷開時自動刪除。這里需要注意三點:
- 排他隊列是基于連接可見的,同一連接的不同信道是可以同時訪問同一連接創(chuàng)建的排他隊列;
- “首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同;
- 即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用于一個客戶端發(fā)送讀取消息的應用場景。
- autoDelete:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用于臨時隊列。
總結:如果將 queue 的持久化標識 durable 設置為 true,則代表是一個持久的隊列。當服務重啟之后,隊列仍然會存在,這是因為服務會把持久化的 queue 存放在硬盤上,當服務重啟的時候,會重新加載這些被持久化的 queue。
3、消息持久化
隊列是可以被持久化,但是里面的消息是否為持久化那還要看消息的持久化設置。也就是說,重啟之前 queue 里面如果還有未發(fā)出去的消息的話,重啟之后,消息是否還存在隊列里面就要取決于在發(fā)送消息時對消息的設置。
消息持久化的實現(xiàn)需要在發(fā)送消息時設置消息的持久化標識,例如:
channel.basicPublish("exchange01", "routing_key01", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_message".getBytes());
方法原型是:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
這里關鍵的是 BasicProperties props
這個參數(shù),它的定義如下:
public BasicProperties(
String contentType,//消息類型如:text/plain
String contentEncoding,//編碼
Map<String,Object> headers,
Integer deliveryMode,//1:nonpersistent 2:persistent
Integer priority,//優(yōu)先級
String correlationId,
String replyTo,//反饋隊列
String expiration,//expiration到期時間
String messageId,
Date timestamp,
String type,
String userId,
String appId,
String clusterId
)
其中 deliveryMode=1
代表不持久化,deliveryMode=2
代表持久化。而代碼實現(xiàn)中的 MessageProperties.PERSISTENT_PLAIN
值是官方提供的一個將 deliveryMode 設置為 2 的 BasicProperties 的對象:
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties(
"text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null
);
除此之外,我們也可以使用另一種方式來設置消息持久化標志位:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2); // 將 deliveryMode 值設置為 2 表示消息持久化
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange01", "routing_key01", properties, "persistent_message".getBytes());
至此,我們可以知道:當 broker 服務其重啟后,想要消息不丟失,既需要設置隊列持久化,也需要設置消息持久化。
擴展知識:
basicPublish 方法還有另外兩個重載方法:
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
這里有兩個關鍵的參數(shù):mandatory
和 immediate
。這兩個標識位都有當消息傳遞過程中不可達目的地時將消息返回給生產者的功能。下面簡單講解下這兩個標識位:
1)mandatory
- 當 mandatory 標志位設置為 true 時:如果 exchange 根據(jù)自身類型和消息 routeKey 無法找到一個符合條件的 queue,那么會調用
basic.return
方法將消息返回給生產者(Basic.Return + Content-Header + Content-Body); - 當 mandatory 設置為 false 時,如果出現(xiàn)上述情形的話,broker 會直接將消息扔掉。
2)immediate
- 當 immediate 標志位設置為 true 時:如果 exchange 在將消息路由到 queue(s) 時發(fā)現(xiàn)對于的 queue 上么有消費者,那么這條消息不會放入隊列中。當與消息 routeKey 關聯(lián)的所有 queue(一個或者多個)都沒有消費者時,該消息會通過
basic.return
方法返還給生產者。
概括來說就是:
- mandatory 標志告訴服務器至少將該消息 route 到一個隊列中,否則將消息返還給生產者;
- immediate 標志告訴服務器如果該消息關聯(lián)的 queue上有消費者,則馬上將消息投遞給它,如果所有 queue 都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。
4、交換器持久化
對于消息的可靠性來說,只需要設置隊列的持久化和消息的持久化即可。exchange 的持久化并沒有什么影響,但是,如果 exchange 不設置持久化的話,當 broker 服務重啟之后,exchange 將不復存在,這樣會導致消息發(fā)送者 producer 無法正常發(fā)送消息。
所以,建議同樣設置 exchange 的持久化。而 exchange 的持久化設置也特別簡單,設置方法原型如下:
Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable)throws IOException;
- exchange:交換器的名稱;
-
type:交換器的類型,常見的如
fanout direct topic
; -
durable:持久話標志位, durable 設置為
true
表示持久化, 反之為非持久 。
所以,只需要在聲明的時候將 durable 字段設置為 true 即可:
channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);
二、RabbitMQ 知識擴展
1、內存告警與內存換頁
1-1、內存告警
當內存使用超過配置的閾值時,RabbitMQ 會暫時阻塞客戶端的連接,并停止接收從客戶端發(fā)來的消息,以避免服務崩潰,客戶端與服務端的心跳檢測也會失敗。
當出現(xiàn)內存告警時,可以通過管理命令臨時調整內存大?。?/p>
RabbitMQctl set_vm_memory_high_watermark <fraction>
-
fraction
為內存閾值,RabbitMQ 默認是 0.4,表示當 RabbitMQ 使用的內存超過總內存的 40% 時,就會產生告警并阻塞所有生產則連接。
通過此命令修改的閾值在 RabbitMQ 重啟之后將會失效,如果想要設置的閾值永久有效需要修改配置文件:
# 相對值,也就是前面的fraction,建議設置在0.4~0.66之間,不要超過0.7
vm_memory_high_watermark.relative=0.4
# 絕對值,單位為KB,MB,GB,對應的臨時命令是:RabbitMQctl set_vm_memory_high_watermark absolute <value>
#vm_memory_high_watermark.absolute=1GB
修改完配置文件后,需要重啟服務才會生效。
1-2、內存換頁
在某個 broker 節(jié)點觸及內存閾值并阻塞生產者之前,它會嘗試將隊列內存中的消息換頁存儲到磁盤以釋放內存空間。持久化和非持久化的消息都會被轉儲到磁盤中,其中持久化的消息本身就在磁盤中有一個備份,所以這里會將持久化的消息直接從內存中清除掉。
默認情況下,在內存使用達到設置的閾值的 50% 時會進行換頁操作。也就是說,在默認的內存閾值 40% 的情況下,當內存超過 40% * 50% = 20%
時會經行換頁動作。
內存換頁閾值可以通過在配置文件中設置來進行調整:
vm_memory_high_watermark_paging_ratio=0.75
上面的配置將會在 RabbitMQ 內存使用率達到 30%(假設內存閾值是 0.4)時進行換頁動作,并在 40% 時阻塞生產者(當 vm_memory_high_watermark_paging_ratio 的值大于 1 時,相當于禁用了換頁功能)。
2、磁盤告警與配置
2-1、磁盤告警
當磁盤剩余空間低于設置的閾值時,RabbitMQ 同樣會阻塞生產者,這樣可以避免因非持久化的消息持續(xù)換頁而耗盡磁盤空間導致服務崩潰。
默認情況下,磁盤的閾值是50M,表示當磁盤剩余空間低于50M時,會阻塞生產者并停止內存中消息的換頁動作。這個閾值的設置可以減小,但不能完全消除因磁盤耗盡而導致崩潰的可能性。比如在兩次磁盤空間檢測期間內,磁盤空間從大于50M被耗盡到0M。
2-2、修改磁盤告警閾值
可以通過以下命令臨時調整磁盤閾值:
#設置具體大小,單位為KB/MB/GB
RabbitMQctl set_disk_free_limit <disk_limit>
#設置相對值,建議取值為1.0~2.0(相對于內存的倍數(shù),如內存大小是8G,若為1.0,則表示磁盤剩余8G時,阻塞)
RabbitMQctl set_disk_free_limit mem_relative <fraction>
如果要永久生效需要對應的配置文件,配置如下(需要重啟生效):
disk_free_limit.relative=2.0
#disk_free_limit_absolute=50MB
這里有個建議:一個相對謹慎的做法是將磁盤閾值設置為與操作系統(tǒng)所顯示的內存大小一致。
3、數(shù)據(jù)寫入磁盤時機
- 消息的正常寫入磁盤流程為:消息數(shù)據(jù)寫入到緩存 Buffer 中(大小為 1 M),Buffer 數(shù)據(jù)滿了之后會寫入內存文件中,最后再刷新到磁盤文件中;
- 存在個固定的刷盤時間:25ms,也就是不管 Buffe r滿不滿,每隔 25ms,Buffer 里的數(shù)據(jù)及未刷新到磁盤的文件內容必定會刷到磁盤;
- 每次消息寫入后,如果沒有后續(xù)寫入請求,則會直接將已寫入的消息刷到磁盤:使用 Erlang 的
receive x after 0
來實現(xiàn)。只要進程的信箱里沒有消息,則產生一個 timeout 消息,而 timeout 會觸發(fā)刷盤操作。
4、磁盤消息格式
消息保存于 $MNESIA/msg_store_persistent/x.rdq
文件中,其中 x 為數(shù)字編號,從 1 開始,每個文件最大為 16M(16777216),超過這個大小會生成新的文件,文件編號加 1。
文件中的消息格式如下:文章來源:http://www.zghlxwxcb.cn/news/detail-619562.html
<<Size:64, MsgId:16/binary, MsgBody>>
-
MsgId 為 RabbitMQ 通過
rabbit_guid:gen()
每一個消息生成的 GUID; - MsgBody 會包含消息對應的 exchange,routing_keys,消息的內容,消息對應的協(xié)議版本,消息內容格式(二進制還是其它)等等。
5、磁盤文件刪除機制
- 當所有磁盤文件中的垃圾消息(已經被刪除的消息)比例大于閾值(GARBAGE_FRACTION = 0.5)時,會觸發(fā)文件合并操作(至少有三個文件存在的情況下),以提高磁盤利用率。
- publish 消息時寫入內容,ack 消息時刪除內容(更新該文件的有用數(shù)據(jù)大?。斠粋€文件的有用數(shù)據(jù)等于 0 時,刪除該文件。
相關官方文檔:文章來源地址http://www.zghlxwxcb.cn/news/detail-619562.html
- https://blog.RabbitMQ.com/posts/2011/01/RabbitMQ-backing-stores-databases-and-disks
- https://www.RabbitMQ.com/persistence-conf.html
到了這里,關于【RabbitMQ】之持久化機制的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!