目錄
一、實現(xiàn)消息持久化
1.1、消息的存儲設(shè)定
1.1.1、存儲方式
1.1.2、存儲格式約定
1.1.3、queue_data.txt 文件內(nèi)容
?1.1.4、queue_stat.txt 文件內(nèi)容
1.2、實現(xiàn) MessageFileManager 類
1.2.1、設(shè)計目錄結(jié)構(gòu)和文件格式
1.2.2、實現(xiàn)消息的寫入
1.2.3、實現(xiàn)消息的刪除(隨機訪問文件)
1.2.4、獲取隊列文件中所有有效消息
1.2.5、GC 機制
1.2.6、GC 拓展
二、統(tǒng)一硬盤操作
一、實現(xiàn)消息持久化
1.1、消息的存儲設(shè)定
1.1.1、存儲方式
傳輸?shù)?Message 消息因該如何在硬盤上存儲?我們應(yīng)當(dāng)考慮一下幾點:
- 消息操作并不涉及到復(fù)雜的增刪改查.
- 消息數(shù)量可能會非常多,數(shù)據(jù)庫訪問的效率不是很高.
因此這里不使用數(shù)據(jù)庫進行存儲,而是把消息存儲在文件中~
1.1.2、存儲格式約定
消息是依附于隊列的,因此存儲的時候,就把消息按照 隊列 維度展開.
根據(jù)上一章我們講到數(shù)據(jù)庫的存儲,因此我們已經(jīng)有了 data 目錄(meta.db 就在這個目錄中),這里我們約定 —— 一個隊列就是一個文件目錄,每個對列的文件目錄下有兩個文件,來存儲消息,例如下圖:
- 第一個文件 queue_data.txt:用來保存消息的內(nèi)容;
- 第二個文件 queue_stat.txt:用來保存消息的統(tǒng)計信息;
1.1.3、queue_data.txt 文件內(nèi)容
這里約定,queue_data.txt 文件中包含若干個消息,每個消息都以二進制的方式存儲,每個消息由兩個部分構(gòu)成,
- 第一個部分約定占用 4 個字節(jié),用來保存消息的長度(防止粘包問題).
- 第二個部分為具體的二進制消息數(shù)據(jù)(Message 對象序列化后的數(shù)據(jù)).
如下圖:
?1.1.4、queue_stat.txt 文件內(nèi)容
使用這個文件,來保存消息的統(tǒng)計信息。
這里只存一行文本格式的數(shù)據(jù),并且只有兩列:
- 第一列是 queue_data.txt 中總的消息數(shù)目.
- 第二列是 queue_data.txt 中的有效消息數(shù)目.
這兩者使用 \t 分割,形如:2000\t1500
1.2、實現(xiàn) MessageFileManager 類
1.2.1、設(shè)計目錄結(jié)構(gòu)和文件格式
定義一個內(nèi)部類,表示隊列的統(tǒng)計信息(優(yōu)先考慮 static,和外類解耦合).
static public class Stat {
//對于這樣的簡單類定義成 public 就不用 get set 方法了,類似于 C 的結(jié)構(gòu)體
public int totalCount;
public int validCount;
}
通過以下方法獲取隊列對應(yīng)消息文件的路徑,以及隊列 數(shù)據(jù)/統(tǒng)計 文件的路徑.
/**
* 用來獲取指定隊列對應(yīng)的消息文件所在路徑
* @param queueName
* @return
*/
private String getQueueDir(String queueName) {
return "./data/" + queueName;
}
/**
* 用來獲取該隊列的消息數(shù)據(jù)文件路徑
* 此處使用 txt 文件,存儲二進制數(shù)據(jù),實際上不太合適,但也先這樣吧~
* 跟適合使用 .bin / .dat
* @param queueName
* @return
*/
private String getQueueDataPath(String queueName) {
return getQueueDir(queueName) + "/queue_data.txt";
}
/**
* 用來獲取該隊列的消息統(tǒng)計文件路徑
* @param queueName
* @return
*/
private String getQueueStatPath(String queueName) {
return getQueueDir(queueName) + "/queue_stat.txt";
}
通過以下方法實現(xiàn)隊列 統(tǒng)計 文件的讀寫(便于后續(xù)創(chuàng)建文件時對 統(tǒng)計文件 的初始化).
/**
* 從文件中讀取隊列消息統(tǒng)計信息
* @param queueName
* @return
*/
private Stat readStat(String queueName) {
Stat stat = new Stat();
try(InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {
Scanner scanner = new Scanner(inputStream);
stat.totalCount = scanner.nextInt();
stat.validCount = scanner.nextInt();
return stat;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 將隊列消息統(tǒng)計信息寫入文件
* @param queueName
* @param stat
*/
private void writeStat(String queueName, Stat stat) {
try(OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.write(stat.totalCount + "\t" + stat.validCount);
printWriter.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
通過以下方法來創(chuàng)建和銷毀文件和目錄
/**
* 創(chuàng)建隊列對應(yīng)的文件和目錄
* @param queueName
*/
public void createQueueFiles(String queueName) throws IOException {
//1.創(chuàng)建隊列對應(yīng)的消息目錄
File baseDir = new File(getQueueDir(queueName));
if(!baseDir.exists()) {
//不存在,就創(chuàng)建這個目錄
boolean ok = baseDir.mkdirs();
if (!ok) {
throw new IOException("創(chuàng)建目錄失敗!baseDir=" + baseDir.getAbsolutePath());
}
}
//2.創(chuàng)建隊列數(shù)據(jù)文件
File queueDataFile = new File(getQueueDataPath(queueName));
if(!queueDataFile.exists()) {
boolean ok = queueDataFile.createNewFile();
if(!ok) {
throw new IOException("創(chuàng)建文件失敗! queueDataFile=" + queueDataFile.getAbsolutePath());
}
}
//3.創(chuàng)建消息統(tǒng)計文件
File queueStatFile = new File(getQueueStatPath(queueName));
if(!queueStatFile.exists()) {
boolean ok = queueStatFile.createNewFile();
if(!ok) {
throw new IOException("創(chuàng)建文件失敗! queueStatFile=" + queueStatFile.getAbsolutePath());
}
}
//4.給消息統(tǒng)計文件,設(shè)定初始值. 0\t0
Stat stat = new Stat();
stat.totalCount = 0;
stat.validCount = 0;
writeStat(queueName, stat);
}
/**
* 刪除隊列的目錄和文件
* 此方法的用處:隊列也是可以被刪除的,隊列刪除之后,就需要調(diào)用此方法,刪除對應(yīng)的消息文件之類的
* @param queueName
* @throws IOException
*/
public void destroyQueueFiles(String queueName) throws IOException {
//先刪除里面的文件,再刪除目錄
File queueDataFile = new File(getQueueDataPath(queueName));
boolean ok1 = queueDataFile.delete();
File queueStatFile = new File(getQueueStatPath(queueName));
boolean ok2 = queueStatFile.delete();
File baseDir = new File(getQueueDir(queueName));
boolean ok3 = baseDir.delete();
if(!ok1 || !ok2 || !ok3) {
//任意一個失敗,都算整體刪除失敗
throw new IOException("刪除隊列目錄和文件失敗! baseDir=" + baseDir.getAbsolutePath());
}
}
1.2.2、實現(xiàn)消息的寫入
消息寫入主要分為以下四步:
- 先檢查當(dāng)前文件是否存在
- 把 Message 對象進行序列化,轉(zhuǎn)化成 二進制 字節(jié)數(shù)組
- 根據(jù)當(dāng)前隊列文件長度,計算出 Message 對象的 offsetBeg 和 offsetEnd
- 將 message 數(shù)據(jù)追加到文件末尾
- 更新消息統(tǒng)計文件內(nèi)容
/**
* 檢查隊列的目錄和文件是否存在
* 如果后續(xù)有生產(chǎn)者 broker server 生產(chǎn)消息了,這個消息就需要被記錄到文件上(持久化的前提是文件必須要存在)
* @param queueName
* @return
*/
public boolean checkFilesExits(String queueName) {
//數(shù)據(jù)文件和統(tǒng)計文件都判斷存在
File queueDataFile = new File(getQueueDataPath(queueName));
if(!queueDataFile.exists()) {
return false;
}
File queueStatFile = new File(getQueueStatPath(queueName));
if(!queueStatFile.exists()) {
return false;
}
return true;
}
/**
* 將一個新的消息(message)放到隊列文件中(queue)
* @param queue
* @param message
*/
public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
//1.先檢查當(dāng)前文件是否存在
if(!checkFilesExits(queue.getName())) {
throw new MqException("[MessageFileManager] 隊列對應(yīng)的文件不存在! queueName=" + queue.getName());
}
//2.把 Message 對象進行序列化,轉(zhuǎn)化成 二進制 字節(jié)數(shù)組
byte[] messageBinary = BinaryTool.toBytes(message);
//3.根據(jù)當(dāng)前隊列文件長度,計算出 Message 對象的 offsetBeg 和 offsetEnd
//將新的 Message 數(shù)據(jù),寫入到文件的末尾,那么此時 offsetBeg = 4 + 當(dāng)前文件總長度 (4 個字節(jié)是我們約定好用來表示信息長度的)
// offsetEnd = 當(dāng)前文件總長度 + 4 + message 長度
//這里為了避免寫操作引發(fā)線程安全問題
synchronized(queue) {
File queueDataFile = new File(getQueueDataPath(queue.getName()));
message.setOffsetBeg(queueDataFile.length() + 4);
message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);
//4.將 message 數(shù)據(jù)追加到文件末尾
try(OutputStream outputStream = new FileOutputStream(queueDataFile, true)) { //這里 true 表示追加到文件末尾
try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
//這里用 writeInt 來寫 message 長度是為了保證占 4 個字節(jié)(直接用 write 只會寫一個字節(jié))
dataOutputStream.writeInt(messageBinary.length);
//寫入消息體
dataOutputStream.write(messageBinary);
dataOutputStream.flush();
}
}
//5.更新消息統(tǒng)計文件內(nèi)容
Stat stat = readStat(queue.getName());
stat.validCount += 1;
stat.totalCount += 1;
writeStat(queue.getName(), stat);
}
}
1.2.3、實現(xiàn)消息的刪除(隨機訪問文件)
這里的刪除邏輯實際上就是把硬盤中存儲的這個數(shù)據(jù)里面的 isValid 屬性,設(shè)置成 0,然后再寫入硬盤.
- 先把文件中這段數(shù)據(jù)讀出來,還原回 Message 對象
- 把 isValid 改成 0
- 把上述數(shù)據(jù)重新寫回到文件中
- 更新統(tǒng)計文件
為什么這里采用這樣的刪除方式?
新增消息可以直接把消息追加到文件末尾,而刪除消息不好弄~? 因為文件可以視為是一個 “順序表” 的結(jié)構(gòu),因此如果直接刪除中間的元素,就需要設(shè)計到 “順序表搬運” 這樣的操作,效率是非常低的.
因此這里使用邏輯刪除的方式比較合適~~
- 當(dāng)??isValid 為 1,表示有效消息.
- 當(dāng) isValid 為 0 ,表示無效消息
隨著時間的推移文件可能會越來越大,并且可能存在大量的無效消息,針對這種情況,就需要對當(dāng)前消息數(shù)據(jù)文件進行垃圾回收機制(后續(xù)會講到).
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {
//讀寫文件注意線程安全問題
synchronized(queue) {
try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {
//1.先從文件中讀出對應(yīng)的 Message 數(shù)據(jù)
byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.read(bufferSrc); //類似于食堂打飯
//2.把當(dāng)前讀出來的二進制數(shù)據(jù),反序列化成 Message 對象
Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
//3.把 isValid 設(shè)置成無效
diskMessage.setIsValid((byte) 0x0);
//此處不用把形參中的 message 的 isValid 設(shè)為 0,因為這個參數(shù)代表內(nèi)存中管理的 Message 對象
//這個對象馬上就會被從內(nèi)存中刪除
//4.重新寫入文件
byte[] bufferDest = BinaryTool.toBytes(diskMessage);
//這里還需要將光標(biāo)移動到最初這個消息的位置,因為 read 操作也會挪動光標(biāo)
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.write(bufferDest);
// 通過上述折騰,對于文件來說,只有一個字節(jié)發(fā)生改變了而已
}
//更新統(tǒng)計文件,消息無效了,消息個數(shù)就需要 -1
Stat stat = readStat(queue.getName());
if(stat.validCount > 0) {
stat.validCount -= 1;
}
writeStat(queue.getName(), stat);
}
}
Ps:此處這個參數(shù)中的 message 對象,必須得包含有效的 offsetBeg 和 offsetEnd
1.2.4、獲取隊列文件中所有有效消息
讀取文件中有效的(isValid = 1)消息內(nèi)容加載到內(nèi)存中(此方法準(zhǔn)備在程序啟動的時候進行調(diào)用,因此也不需要加鎖)
Ps:
queueName 這里只用這一個參數(shù)就夠了,不需要 MSGQueue 對象
使用 LinkedList 主要是為了后續(xù)進行頭刪的操作
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
LinkedList<Message> messages = new LinkedList<>();
try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {
try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {
//記錄當(dāng)前光標(biāo)位置
long currentOffset = 0;
while(true) {
//1.讀取當(dāng)前消息的長度
int messageSize = dataInputStream.readInt();
//2.按照長度獲取消息內(nèi)容
byte[] buffer = new byte[messageSize];
int actualSize = inputStream.read(buffer);
//比較理論和實際消息長度
if(messageSize != actualSize) {
//如果不匹配說明文件出問題了
throw new MqException("[MessageFileManager] 文件格式錯誤! queueName=" + queueName);
}
//3.把讀到的二進制數(shù)據(jù)反序列化成 Message 對象
Message message = (Message) BinaryTool.fromBytes(buffer);
//4.判斷這個消息是否是無效對象
if(message.getIsValid() != 0x1) {
//無效消息直接跳過
//雖然是無效數(shù)據(jù),但是 offset 不要忘記更新
currentOffset += (4 + messageSize);
continue;
}
//5.有效數(shù)據(jù)就加入到鏈表中,加入前計算一下 offsetBeg 和 offsetEnd
//這個位置需要知道當(dāng)前文件光標(biāo)的位置,由于當(dāng)下使用的 DataInputStream 不方便直接獲取文件光標(biāo)位置, 因此需要使用 currentOffset 手動記錄一下
message.setOffsetBeg(currentOffset + 4);
message.setOffsetEnd(currentOffset + 4 + messageSize);
currentOffset += (4 + messageSize);
//6.最后加入到鏈表當(dāng)中
messages.add(message);
}
} catch (EOFException e) {
//這個 catch 并非真的用來處理 ”異?!?,而是 ”正?!?業(yè)務(wù)邏輯,這是為了當(dāng)消息讀完了能得到一個反饋(有點順?biāo)浦鄣母杏X)
//因為,當(dāng)消息讀取到文件末尾,readInt 就會引發(fā)異常(EOF異常)
System.out.println("[MessageFileManager] 恢復(fù) Message 數(shù)據(jù)完成");
}
}
return messages;
}
1.2.5、GC 機制
這里我們使用 復(fù)制算法 對消息數(shù)據(jù)文件中的垃圾進行回收.
具體的,我們直接遍歷原有的消息數(shù)據(jù)文件,把所有的有效數(shù)據(jù)拷貝到一個新的文件中,再把之前整個舊的文件都刪除,然后將新文件的名字改為舊文件的名字.
什么時候觸發(fā)一次 GC ?
復(fù)制算法比較合適的前提是,當(dāng)前空間里,有效的數(shù)據(jù)不多,大部分是無效的數(shù)據(jù)(減少搬運數(shù)據(jù)的開銷)
因此這里我們約定:當(dāng)總的消息數(shù)目超過 2000 ,并且有效消息的數(shù)目低于總消息數(shù)目的 50%,就觸發(fā)一次 GC (避免 GC 太頻繁,比如一共 4 個消息,其中 2 個消息無效了,就觸發(fā) GC).
Ps:這里的兩個數(shù)字都是自定義的,關(guān)注一定是 策略、思想、方法 ,而不是具體的數(shù)字.
/**
* 檢查是否要針對隊列的消息數(shù)據(jù)文件進行 GC
* @param queueName
* @return
*/
public boolean checkGC(String queueName) {
Stat stat = readStat(queueName);
if(stat.totalCount > 2000 && (double)stat.validCount / (double)stat.totalCount < 0.5) {
return true;
}
return false;
}
/**
* 獲取新文件
* @param queueName
* @return
*/
public String getQueueDataNewPath(String queueName) {
return getQueueDir(queueName) + "/queue_data_new.txt";
}
/**
* 執(zhí)行真正的 gc 操作
* 使用復(fù)制算法完成
* 創(chuàng)建一個新的文件,名字叫做 queue_data_new.txt
* 把之前消息數(shù)據(jù)文件中的有效消息都讀出來,寫道新的文件中
* 刪除舊的文件,再把新的文件改名回 queue_data.txt
* 同時要記得更新消息統(tǒng)計文件
* @param queue
*/
public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
//gc 意味著 "大洗牌" ,這個過程中其他線程不得干預(yù)
synchronized(queue) {
//由于 gc 操作可能回比較耗時,此處統(tǒng)計一下執(zhí)行耗時的時間
long gcBeg = System.currentTimeMillis();
//1.創(chuàng)建一個新文件
File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
if(queueDataNewFile.exists()) {
//正常情況下,這個文件是不存在的,如果存在就是以外,說明上次 gc 了一半,中途發(fā)生了以外
throw new MqException("[MessageFileManager] gc 時發(fā)現(xiàn)該隊列的 queue_data_new 已經(jīng)存在! " +
"queueName=" + queue.getName());
}
boolean ok = queueDataNewFile.createNewFile();
if(!ok) {
throw new MqException("[MessageFileManager] 創(chuàng)建文件失敗! queueDataNewFile=" +
queueDataNewFile.getName());
}
//2.從舊文件中讀出所有的有效消息
LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());
//3.把有效消息寫入新的文件
try(OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {
try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
for(Message message : messages) {
byte[] buffer = BinaryTool.toBytes(message);
//先寫消息長度
dataOutputStream.writeInt(buffer.length);
//再寫消息內(nèi)容
dataOutputStream.write(buffer);
}
}
}
//4.刪除舊文件
File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
ok = queueDataOldFile.delete();
if(!ok) {
throw new MqException("[MessageFileManager] 刪除舊的文件失敗! queueDataOldFile=" + queueDataOldFile.getName());
}
//把 queue_data_new.txt 重命名成 queue_data.txt
ok = queueDataNewFile.renameTo(queueDataOldFile);
if(!ok) {
throw new MqException("[MessageFileManager] 文件重命名失敗! queueDataNewFile=" + queueDataNewFile.getAbsolutePath() +
", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
}
//5.跟新統(tǒng)計文件
Stat stat = readStat(queue.getName());
stat.totalCount = messages.size();
stat.validCount = messages.size();
writeStat(queue.getName(), stat);
long gcEnd = System.currentTimeMillis();
System.out.println("[MessageFileManager] gc 執(zhí)行完畢!queueName=" +
queue.getName() + "time=" + (gcEnd - gcBeg) + "ms");
}
}
1.2.6、GC 拓展
當(dāng)某個隊列中,消息特別多,并且很多都是有效的消息,就會導(dǎo)致后續(xù)對這個文件操作的成本上升很多,例如文件大小是 10G,此時如果觸發(fā)一次 GC ,整體的耗時就會非常高了.
對于 RabbitMQ 來說,解決方案就是把一個大的文件拆分成若干個小文件.
- 文件拆分:當(dāng)單個文件長度到達一定閾值以后,就會拆分成兩個文件.(拆著拆著,就成了很多文件).
- 文件合并:每個單獨的文件都會進行 GC ,如果 GC 之后發(fā)現(xiàn)文件變小了很多,就可能會和其他相鄰的文件合并.
具體實現(xiàn)思路:
- 需要專門的數(shù)據(jù)結(jié)構(gòu),來存儲當(dāng)前隊列中有多少個數(shù)據(jù)文件,每個文件大小是多少,消息數(shù)目是多少,無效消息是多少.
- 設(shè)計策略,什么時候觸發(fā)消息拆分,什么時候觸發(fā)文件合并.
Ps:這里可以先不給出具體實現(xiàn),需要的可以私信我(前提是備注微信號).
二、統(tǒng)一硬盤操作
使用這個類來管理所有硬盤上的數(shù)據(jù)
- 數(shù)據(jù)庫:交換機、綁定、隊列
- 數(shù)據(jù)文件:消息
上層邏輯需要操作硬盤,統(tǒng)一通過這個類來操作(上層代碼不關(guān)心當(dāng)前數(shù)據(jù)是存儲再數(shù)據(jù)庫還是文件中的),提高了代碼的內(nèi)聚,可維護性.
public class DiskDataCenter {
//這個實例用來管理數(shù)據(jù)庫中的數(shù)據(jù)
private DataBaseManager dataBaseManager = new DataBaseManager();
//這個實例用來管理數(shù)據(jù)文件中的數(shù)據(jù)
private MessageFileManager messageFileManager = new MessageFileManager();
/**
* 針對上面兩個實例進行初始化
*/
public void init() {
dataBaseManager.init();
// messageFileManager 中 init 是一個空方法,只是先列在這里,一旦后續(xù)需要擴展,就在這里進行初始化即可
messageFileManager.init();
}
//封裝交換機操作
public void insertExchange(Exchange exchange) {
dataBaseManager.insertExchange(exchange);
}
public void deleteExchange(String exchangeName) {
dataBaseManager.deleteExchange(exchangeName);
}
public List<Exchange> selectAllExchanges() {
return dataBaseManager.selectAllExchanges();
}
//封裝隊列操作
public void insertQueue(MSGQueue queue) throws IOException {
dataBaseManager.insertQueue(queue);
//創(chuàng)建隊列的同時,不僅需要把隊列寫入到數(shù)據(jù)庫中,還需要創(chuàng)建出對應(yīng)的目錄和文件
messageFileManager.createQueueFiles(queue.getName());
}
public void deleteQueue(String queueName) throws IOException {
dataBaseManager.deleteQueue(queueName);
//刪除隊列的同時,不僅需要把隊列從數(shù)據(jù)庫中刪除,還需要把對應(yīng)的文件和目錄刪除
messageFileManager.destroyQueueFiles(queueName);
}
public List<MSGQueue> selectAllQueue() {
return dataBaseManager.selectAllQueues();
}
//封裝綁定操作
public void insertBinding(Binding binding) {
dataBaseManager.insertBinding(binding);
}
public void deleteBinding(Binding binding) {
dataBaseManager.deleteBinding(binding);
}
public List<Binding> selectAllBindings() {
return dataBaseManager.selectAllBindings();
}
//封裝消息操作
public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {
messageFileManager.sendMessage(queue, message);
}
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {
messageFileManager.deleteMessage(queue, message);
//這里刪除消息以后還需要看以下文件中是否有太多的無效文件需要進行清除
if(messageFileManager.checkGC(queue.getName())) {
messageFileManager.gc(queue);
}
}
public List<Message> selectAllMessagesFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
return messageFileManager.loadAllMessageFromQueue(queueName);
}
}
Ps:這里對隊列和消息的封裝都是具有一定的邏輯的!
隊列:
- 創(chuàng)建隊列的同時,不僅需要把隊列寫入到數(shù)據(jù)庫中,還需要創(chuàng)建出對應(yīng)的目錄和文件
- 刪除隊列的同時,不僅需要把隊列從數(shù)據(jù)庫中刪除,還需要把對應(yīng)的文件和目錄刪除
?消息:文章來源:http://www.zghlxwxcb.cn/news/detail-658983.html
- 刪除消息以后還需要看以下文件中是否有太多的無效文件需要進行清除(GC)
文章來源地址http://www.zghlxwxcb.cn/news/detail-658983.html
到了這里,關(guān)于根據(jù)源碼,模擬實現(xiàn) RabbitMQ - 實現(xiàn)消息持久化,統(tǒng)一硬盤操作(3)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!