目錄
?一、消息存儲格式設(shè)計(jì)
? ? ? ??? 1、queue_data.txt:保存消息的內(nèi)容
? ? ? ? ?? 2、queue_stat.txt:保存消息的統(tǒng)計(jì)信息
二、消息序列化
三、自定義異常類
四、創(chuàng)建MessageFileManger類
?? 1、約定消息文件所在的目錄和文件名字
??? 2、隊(duì)列的統(tǒng)計(jì)信息
?? 3、創(chuàng)建隊(duì)列對應(yīng)的目錄和功能
?? 4、實(shí)現(xiàn)刪除隊(duì)列的文件和目錄
?? 5、檢查隊(duì)列的目錄和文件是否都存在
?? 6、把消息寫入到文件中
?? 7、刪除文件中的消息
?? 8、將硬盤中的數(shù)據(jù)加載到內(nèi)存中
?? 9、實(shí)現(xiàn)消息文件的垃圾回收
?????????? 檢測是否要進(jìn)行GC
? ? ? ? ?? 構(gòu)造新目錄
? ? ? ? ?? 進(jìn)行GC操作
五、測試MessageFileManager類
?? 1、“準(zhǔn)備工作”和“收尾工作”
?? 2、測試創(chuàng)建文件是否存在
??? 3、測試writetStat和readStat是否能夠通過
??? 4、測試sendMessage
?? 5、測試刪除消息
?? 6、測試?yán)厥?/p>
六、小結(jié)
?一、消息存儲格式設(shè)計(jì)
對于消息,并不打算存儲在數(shù)據(jù)庫中:
????????(1)消息操作并不會涉及到復(fù)雜的增刪改查
????????(2)消息的數(shù)量可能會非常多,數(shù)據(jù)庫的訪問效率并不高
?所以,我們直接把消息存儲在文件中。
那么消息要如何在文件中存儲呢?
首先消息,它是依附于隊(duì)列的,所以在存儲的時(shí)候,就把消息按照隊(duì)列的維度展開。
我們會將隊(duì)列存儲在和數(shù)據(jù)庫同級的data目錄中,在data中創(chuàng)建一些子目錄,子目錄的名字就是隊(duì)列名。
然后在每個(gè)隊(duì)列的子目錄下面,再分配兩個(gè)文件,用來存儲消息。主要是以下兩個(gè)文件。
? ? ? ??? 1、queue_data.txt:保存消息的內(nèi)容
????????Message是一個(gè)二進(jìn)制格式的文件,包含若干個(gè)消息,每個(gè)消息都以二進(jìn)制的方式存儲,每個(gè)消息都由這幾個(gè)部分構(gòu)成:Message對象序列化之后。
? ??????Messag對象,會分別再內(nèi)存和硬盤上都記錄一份。內(nèi)存中的衣服呢會記錄offsetBegin和offsetEnd。這樣就可以找到內(nèi)存中的Message對象,能夠找到對應(yīng)的硬盤上的message對象。
????????關(guān)于isValid:是用來標(biāo)識當(dāng)前消息在文件中是否有效,為1就是有效的消息,為0就是無效的消息。當(dāng)為0時(shí)就相當(dāng)于邏輯上的刪除消息功能。但是,隨著時(shí)間的推移,消息的增多,那么該消息可能就會大部分都是無效的消息,針對這種情況,就需要對當(dāng)前消息的文件,進(jìn)行垃圾回收。
以下是本程序中實(shí)現(xiàn)的垃圾回收功能:? ?
? 垃圾回收(GC):使用復(fù)制算法,針對消息數(shù)據(jù)文件中的垃圾回收進(jìn)行回收。直接遍歷原有的消息數(shù)據(jù)文件,把有效的數(shù)據(jù)拷貝到一個(gè)新的文件中,然后把之前的舊文件都刪除掉(邏輯刪除)。
作出約定(可以不按這個(gè)來),當(dāng)總消息數(shù)目超過了2000,并且有效的消息數(shù)目低于總數(shù)目的50%,就出發(fā)一次垃圾回收。
?對于RabbitMQ解決垃圾回收的方式如下:
?如果某個(gè)消息隊(duì)列中,消息很多,都是有效消息,就會導(dǎo)致整個(gè)消息的數(shù)據(jù)文件特別答,后續(xù)針對文件的各種操作,成本就會很高。RabbitMQ解決方案如下:
? ? 文件拆分:當(dāng)單個(gè)文件長度達(dá)到一定閾值以后,就會拆分成兩個(gè)文件(拆分次數(shù)越多文件就越多)
? ? 文件合并:每個(gè)單獨(dú)的文件都會進(jìn)行GC,如果GC之后,就會發(fā)現(xiàn)文件變小很多,當(dāng)小到一定程度,就會和其他文件合并。
這樣就會再消息特別多的時(shí)候,也能保證性能上的及時(shí)響應(yīng)。
? ? ? ? ?? 2、queue_stat.txt:保存消息的統(tǒng)計(jì)信息
????????使用這個(gè)文件,來保存消息的統(tǒng)計(jì)信息。
????????只存一行數(shù)據(jù),文本格式。然后一行包括兩列:兩者使用 \t 來分割,形如2000\t500
? ? ? ? ????????第一列是queue_data.txt中總的消息的數(shù)目
? ? ? ????????? 第二列是queue_data.txt中有效消息的數(shù)目。
二、消息序列化
消息序列化:把一個(gè)對象(結(jié)構(gòu)化數(shù)據(jù))轉(zhuǎn)成一個(gè)字符串/字節(jié)數(shù)組。
在序列化之后,對象的信息是不會丟失的,這樣就會方便與存儲和傳輸(在文件中存儲時(shí),只能以字符串/二進(jìn)制數(shù)據(jù)的方式存儲對象)。后面需要用的時(shí)候,就再反序列化。
由于消息的body是二進(jìn)制數(shù)據(jù),所以這里不會使用JSON進(jìn)行序列化。
針對二進(jìn)制序列化,有很多解決方案:????????
? ? ? ? (1)Java標(biāo)準(zhǔn)庫中提供了序列化的方案:ObjectInputStream 和?ObjectOutputStream
? ? ? ? (2)Hession
? ? ? ? (3)protobuffer
? ? ? ? (4)thrift
這里使用第一種,這樣就不用再引入額外的依賴。
在commen中創(chuàng)建一個(gè)BinaryTool,因?yàn)楹竺婵蛻舳诉€會用到這個(gè)類,所以放在公共的包中。
/*
* 序列化和反序列化
* 實(shí)現(xiàn)Serializable接口才能讓這個(gè)對象進(jìn)行序列化和反序列化
* */
public class BinaryTool {
// 把一個(gè)對象序列化成一個(gè)字節(jié)數(shù)組
public static byte[] toBytes(Object object) throws IOException {
// 這個(gè)流對象相當(dāng)于一個(gè)邊長的字節(jié)數(shù)組
// 就可以把object序列化的數(shù)組逐漸寫入到byteArrayOutputStream中,然后統(tǒng)一轉(zhuǎn)成byte[]
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
// 此處的writeObject就會把該對象進(jìn)行序列化,生成二進(jìn)制字節(jié)數(shù)據(jù),就會寫入到ObjectOutputStream中
// 由于ObjectOutputStream又關(guān)聯(lián)到了ByteArrayOutputStream,最終結(jié)果就寫入到ByteArrayOutputStream里面了
objectOutputStream.writeObject(object);
}
// 該操作就是把byteArrayOutputStream中持有的二進(jìn)制數(shù)據(jù)取出來,轉(zhuǎn)成byte[]
return byteArrayOutputStream.toByteArray();
}
}
// 把一個(gè)字節(jié)數(shù)組,反序列化成一個(gè)對象
public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
Object object = null;
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
// 此處的readObject,就是從data的byte[]中讀取數(shù)據(jù)并且進(jìn)行反序列化
object = objectInputStream.readObject();
}
}
return object;
}
}
三、自定義異常類
?
自定義一個(gè)異常類,如果是mq的業(yè)務(wù)邏輯中出的異常,就拋出這個(gè)異常類
/*
*自定義異常類
*/
public class MqException extends Exception{
public MqException(String reason){
super(reason);
}
}
四、創(chuàng)建MessageFileManger類
?對硬盤上的消息進(jìn)行管理的類。
?? 1、約定消息文件所在的目錄和文件名字
//這里的init()方法暫時(shí)不用,只是列在這,后面可能擴(kuò)展
public void init(){
// 后續(xù)擴(kuò)展
}
// 約定的那個(gè)消息文件所在的目錄和文件名
// 用來獲取到指定隊(duì)列對應(yīng)的消息文件所在的路徑
private String getQueueDir(String queueName){
return "./data/" + queueName;
}
// 用來獲取該隊(duì)列的消息數(shù)據(jù)文件路徑
private String getQueueDataPath(String queueName){
return getQueueDir(queueName) + "/queue_data.txt";
}
// 用來獲取該隊(duì)列列的消息統(tǒng)計(jì)文件路徑
private String getQueueStatPath(String queueName){
return getQueueDir(queueName) + "/queue_stat.txt";
}
??? 2、隊(duì)列的統(tǒng)計(jì)信息
定義一個(gè)內(nèi)部類,來表示該隊(duì)列的統(tǒng)計(jì)信息:
// 定義一個(gè)內(nèi)部類,來表示該隊(duì)列的統(tǒng)計(jì)信息
static public class Stat{
// 直接定義成public
public int totalCount; //總消息數(shù)量
public int validCount;//有效消息的數(shù)量
}
然后實(shí)現(xiàn)消息統(tǒng)計(jì)的讀寫功能:
private Stat readStat(String queueName){
// 由于當(dāng)前的消息統(tǒng)計(jì)文件是文本文件,直接使用scanner讀取文件內(nèi)容
Stat stat = new Stat();
try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {
Scanner scanner = new Scanner(inputStream);
stat.totalCount = scanner.nextInt();
stat.validCount = scanner.nextInt();
return stat;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
private void writeStat(String queueName,Stat stat){
// 使用PrinWrite來寫
// OutputStream打開文件,默認(rèn)情況下,會直接把源文件清空,新數(shù)據(jù)會覆蓋原數(shù)據(jù)
// 這里直接覆蓋就可以了
try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.write(stat.totalCount + "\t" + stat.validCount);
printWriter.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
?? 3、創(chuàng)建隊(duì)列對應(yīng)的目錄和功能
// 創(chuàng)建隊(duì)列對應(yīng)的文件和目錄
public void createQueueFiles(String queueName) throws IOException {
// 1.創(chuàng)建隊(duì)列對應(yīng)的消息目錄
File baseDir = new File(getQueueDir(queueName));
if (!baseDir.exists()){
// 不存在,就創(chuàng)建這個(gè)目錄
boolean ok = baseDir.mkdirs();
if (!ok){
throw new IOException("創(chuàng)建目錄失??!baseDir = " + baseDir.getAbsolutePath());
}
}
// 2.創(chuàng)建隊(duì)列數(shù)據(jù)文件
File queueDataFile = new File(getQueueDataPath(queueName));
if (!queueDataFile.exists()){
boolean ok = queueDataFile.createNewFile();
if (!ok){
throw new IOException("創(chuàng)建文件失??!queueDataFile = " + queueDataFile.getAbsolutePath());
}
}
// 3.創(chuàng)建消息統(tǒng)計(jì)文件
File queueStatFile = new File(getQueueStatPath(queueName));
if (!queueStatFile.exists()){
boolean ok = queueStatFile.createNewFile();
if (!ok){
throw new IOException("創(chuàng)建文件失?。ueueStatFile = " + queueStatFile.getAbsolutePath());
}
}
// 4.給消息統(tǒng)計(jì)文件,設(shè)定初始值 0\t0
Stat stat = new Stat();
stat.totalCount = 0;
stat.validCount = 0;
writeStat(queueName,stat);
}
?? 4、實(shí)現(xiàn)刪除隊(duì)列的文件和目錄
// 實(shí)現(xiàn)刪除隊(duì)列的文件和目錄
public void destroyQueueFiles(String queueName) throws IOException {
// 先刪除文件,在刪除目錄
File queueDataFile = new File(getQueueDataPath(queueName));
boolean ok1 = queueDataFile.delete();
File queueStatFile = new File(getQueueStatPath(queueName));
boolean ok2 = queueStatFile.delete();
File baseDir = new File(getQueueDir(queueName));
boolean ok3 = baseDir.delete();
if (!ok1 || !ok2 || !ok3){
// 有任意一個(gè)刪除失敗,就算整體刪除失敗
throw new IOException("刪除隊(duì)列目錄和文件失??!baseDir = " + baseDir.getAbsolutePath());
}
}
?? 5、檢查隊(duì)列的目錄和文件是否都存在
后續(xù)有生產(chǎn)者給brocker server生產(chǎn)消息了,這個(gè)消息就可能需要記錄到文件上(取決消息是否要持久化)。但是判斷是否要持久化之前,需要檢查隊(duì)列中的文件是否存在。
// 檢查隊(duì)列的目錄和文件是否存在
public boolean checkFileExists(String queueName){
// 判斷隊(duì)列的數(shù)據(jù)文件和統(tǒng)計(jì)文件是否都存在
File queueDataFile = new File(getQueueDataPath(queueName));
if (!queueDataFile.exists()){
return false;
}
File queueStatFile = new File(getQueueStatPath(queueName));
if (!queueStatFile.exists()){
return false;
}
return true;
}
?? 6、把消息寫入到文件中
把一個(gè)新的消息,放到隊(duì)列對應(yīng)的文件中。主要包含以下三步:
(1)檢查寫入的隊(duì)列是否存在;
(2)把Message對象進(jìn)行序列化,轉(zhuǎn)成二進(jìn)制的字節(jié)數(shù)組;
(3)獲取到當(dāng)前數(shù)據(jù)的文件長度,使用[offsetBegin,offsetEnd]。把新的Message數(shù)據(jù),寫入到隊(duì)列數(shù)據(jù)文件的末尾,此時(shí),
? ?Message對象的offsetBegin,就是當(dāng)前文件長度 +4
???offsetEnd就是當(dāng)前文件長度 + 4 + message自身長度
(4)寫入消息到數(shù)據(jù)文件,追加到數(shù)據(jù)文件末尾
(5)更新消息統(tǒng)計(jì)文件
寫入文件時(shí)的線程安全問題:
* 如果兩個(gè)線程,是往同一個(gè)隊(duì)列中寫消息,此時(shí)就需要阻塞等待;
假設(shè)現(xiàn)在有兩個(gè)線程t1,t2。如果沒有加鎖,那么他們的目的就是將一個(gè)message寫入到104~124之間去。但是,此時(shí)可能就會導(dǎo)致t1計(jì)算長度以后,沒有進(jìn)行寫文件;t2就開始計(jì)算長度了,并且執(zhí)行了寫文件操作,寫完以后,t1才開始寫,但是此時(shí)t1就不是從104寫了,而是從124開始寫。這樣會導(dǎo)致queue_data多出一段。
?
?所以這里我們就需要對隊(duì)列進(jìn)行加鎖。
* 如果兩個(gè)線程,需要往不同的隊(duì)列中些消息,此時(shí)就不需要阻塞等待。
總體代碼如下:
// 該方法用于把一個(gè)新的消息,放到隊(duì)列對應(yīng)的文件中
// queue表示要把消息寫入的隊(duì)列,message則是要寫的消息
public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
// 1、檢查要寫入的隊(duì)列是否存在
// 如果不存在
if (!checkFileExists(queue.getName())){
throw new MqException("[MessageFileManager] 隊(duì)列對應(yīng)的文件不存在!queueName = " + queue.getName());
}
// 2、把Message對象進(jìn)行序列化,轉(zhuǎn)成二進(jìn)制的字節(jié)數(shù)組
byte[] messageBinary = BinaryTool.toBytes(message);
// 這個(gè)鎖是,當(dāng)有兩個(gè)對象針對同一個(gè)對象操作時(shí),鎖才會有效
synchronized (queue){
// 3、先獲取到當(dāng)前的隊(duì)列數(shù)據(jù)文件長度,使用[offsetBegin,offsetEnd]
// 把新的Message數(shù)據(jù),寫入到隊(duì)列數(shù)據(jù)文件的末尾,此時(shí)Message對象的offsetBegin,就是當(dāng)前文件長度+4
// offsetEnd就是當(dāng)前文件長度 + 4 + message自身長度
File queueDataFile = new File(getQueueDataPath(queue.getName()));
// 獲取到文件的長度:queueDataFile.length();單位字節(jié)
message.setOffsetBegin(queueDataFile.length() + 4);
message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);
// 4.寫入消息到數(shù)據(jù)文件,追加到數(shù)據(jù)文件末尾
try(OutputStream outputStream = new FileOutputStream(queueDataFile,true)){
try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
// 先寫當(dāng)前消息的長度,占據(jù)4個(gè)字節(jié)
// writeInt()方法用于將給定的整數(shù)值作為4個(gè)字節(jié)(即32位)寫入基本DataOutputStream,并且成功執(zhí)行時(shí)變量計(jì)數(shù)器加4。
dataOutputStream.writeInt(messageBinary.length);
// 寫入消息本體
dataOutputStream.write(messageBinary);
}
}
// 5.更新消息統(tǒng)計(jì)文件
Stat stat = readStat(queue.getName());
stat.totalCount += 1;
stat.validCount += 1;
writeStat(queue.getName(),stat);
}
}
?? 7、刪除文件中的消息
這里就是邏輯刪除:將isValid設(shè)置為0.
主要分為3步:
? ? ? ? (1)把文件中需要刪除的一段數(shù)據(jù)讀出來,
? ? ? ? (2)還原回Message對象(反序列化);
? ? ? ? (3)把isValid改為0;
? ? ? ? (4)將上面的數(shù)據(jù)又寫回到文件。
? ? ? ? (5)更新統(tǒng)計(jì)文件
這里的message對象,必須要包含offsetBegin和offsetEnd。因?yàn)檫@里是對文件中指定的位置進(jìn)行讀寫的(把這個(gè)隨機(jī)訪問)。隨機(jī)訪問用到的類RandomAcessFile。
關(guān)于RandomAcessFile.seek()是用于設(shè)置文件指針(相當(dāng)于光標(biāo))位置,設(shè)置后,光標(biāo)會從當(dāng)前指針的下一位讀取到或?qū)懭氲健?
// 刪除消息的方法
public void deleteMessage(MSGQueue queue,Message message) throws IOException,ClassNotFoundException{
synchronized (queue){
try(RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()),"rw")){
// 1.先從文件中讀取對應(yīng)的Message數(shù)據(jù)
byte[] bufferSrc = new byte[(int)(message.getOffsetEnd() - message.getOffsetBegin())];
randomAccessFile.seek(message.getOffsetBegin());
randomAccessFile.read(bufferSrc);
// 2.把當(dāng)前讀出來的二進(jìn)制數(shù)據(jù),轉(zhuǎn)回成Message對象
Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
// 3.把isValid設(shè)置為無效
diskMessage.setIsValid((byte) 0x0);
// 4.重新寫入文件
byte[] buffserDest = BinaryTool.toBytes(diskMessage);
// 這里還需要設(shè)置光標(biāo)的位置,因?yàn)?上面的光標(biāo)已經(jīng)隨著讀出數(shù)據(jù)而發(fā)生了改變,已經(jīng)走到了下一條message的offsetBegin,
// 這里為了重新寫入數(shù)據(jù)到文件中,就需要將光標(biāo)移到對應(yīng)的位置上面
randomAccessFile.seek(message.getOffsetBegin());
randomAccessFile.write(buffserDest);
// 5.統(tǒng)計(jì)文件-1
// 因?yàn)橛幸粭l數(shù)據(jù)無效了
Stat stat = readStat(queue.getName());
if (stat.validCount > 0){
stat.validCount -= 1;
}
writeStat(queue.getName(),stat);
}
}
}
?? 8、將硬盤中的數(shù)據(jù)加載到內(nèi)存中
將數(shù)據(jù)從文件中,讀取出所有的消息內(nèi)容,加載到內(nèi)存當(dāng)中(放到一個(gè)鏈表中),這個(gè)方法會在程序啟動的時(shí)候調(diào)用,主要又以下幾步
????????1.讀取當(dāng)前消息的長度;
? ? ? ? 2.按照該長度,讀取消息內(nèi)容;
? ? ? ? 3.將讀取到的二進(jìn)制數(shù)據(jù),反序列化回Message對象
? ? ? ? 4.判斷消息對象是不是無效對象
? ? ? ? 4.將有效的Message對象插入到鏈表中
// 將數(shù)據(jù)從文件中,讀取出所有的消息內(nèi)容,加載到內(nèi)存當(dāng)中(放到一個(gè)鏈表中)
// 由于該方法實(shí)在程序啟動時(shí)調(diào)用, 此時(shí)服務(wù)器還不能處理請求,不涉及線程操作文件.
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException,MqException,ClassNotFoundException {
LinkedList<Message> messages = new LinkedList<>();
try(InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){
try(DataInputStream dataInputStream = new DataInputStream(inputStream)){
// 該變量記錄當(dāng)前文件光標(biāo)位置,初始位置為0
long currentOffset = 0;
// 一個(gè)文件中包含了很多消息,這里要循環(huán)讀取
while (true){
// 1.讀取當(dāng)前消息的長度,這里可能會讀到文件末尾
// reaIn()方法讀到文件末尾,會拋出EOFException異常
// readInt()讀取出4個(gè)字節(jié)
int messageSize = dataInputStream.readInt();
// 2.按照這個(gè)長度,讀取消息內(nèi)容
// buffer是一個(gè)盛放消息容器,和消息的長度一般大小
byte[] buffer = new byte[messageSize];
int actualSize = dataInputStream.read(buffer);
if (messageSize != actualSize){
// 如果不匹配,說明文件有問題,格式錯(cuò)亂了
throw new MqException("[MessageFileManager] 文件格式錯(cuò)誤!queueName = " + queueName);
}
// 3.將讀取到的二進(jìn)制數(shù)據(jù),反序列化回Message對象
Message message = (Message) BinaryTool.fromBytes(buffer);
// 4.判定這個(gè)消息對象是不是無效對象
if(message.getIsValid() != 0x1){
// 無效數(shù)據(jù),直接跳過
// 雖然消息是無效消息,但是offset要更新
currentOffset += (4 + messageSize);
continue;
}
// 有效數(shù)據(jù),則需要把這個(gè)Message對象加入到鏈表中,加入前還需要填寫offsetBegin和offsetEnd;
// 進(jìn)行offset的時(shí)候,需要知道當(dāng)前光標(biāo)的位置,由于當(dāng)下使用的DataInputStream并不方便計(jì)算光標(biāo)位置
// 因此這里手動計(jì)算文件光標(biāo)位置
message.setOffsetBegin(currentOffset + 4);
message.setOffsetEnd(currentOffset + 4 + messageSize);
currentOffset += (4 + messageSize);
messages.add(message);
}
} catch (EOFException e){
System.out.println("[MessageFileManager]恢復(fù)Message數(shù)據(jù)完成");
}
}
return messages;
}
?? 9、實(shí)現(xiàn)消息文件的垃圾回收
為什么要實(shí)現(xiàn)垃圾回收?(GC)
????????由于當(dāng)前會不停的往消息文件中寫入新消息,而且刪除也只是邏輯刪除(isValid),這樣就可能導(dǎo)致消息文件越來越大,并且里面又包含了大量的無效消息。
此處的垃圾回收,使用的是復(fù)制算法:
? ? ? ? 判斷,當(dāng)文件中消息總數(shù)超過了2000,并且有效消息的數(shù)目不足50%時(shí),就觸發(fā)垃圾回收。然后將文件中有效的消息復(fù)制出來,單獨(dú)寫入到一個(gè)新的文件中,刪除舊文件,使用新文件代替。
?????????? 檢測是否要進(jìn)行GC
檢查當(dāng)前是否要針對該隊(duì)列的消息數(shù)據(jù)文件進(jìn)行GC,判斷是否要GC,根據(jù)總消息數(shù)目和有效消息數(shù)目判斷。
// 檢查當(dāng)前是否要針對該隊(duì)列的消息數(shù)據(jù)文件進(jìn)行GC
public boolean checkGC(String queueName){
// 判斷是否要GC,根據(jù)總消息數(shù)目和有效消息數(shù)目,這兩個(gè)值都是在消息統(tǒng)計(jì)文件中實(shí)現(xiàn)的。
Stat stat = readStat(queueName);
if (stat.totalCount > 2000 && (double)stat.validCount / (double) stat.totalCount < 0.5){
return true;
}
return false;
}
? ? ? ? ?? 構(gòu)造新目錄
構(gòu)造一個(gè)新目錄,放置有效的復(fù)制信息。
// 構(gòu)造一個(gè)目錄結(jié)構(gòu)放置復(fù)制的信息
private String getQueueDataNewPath(String queueName){
return getQueueDir(queueName) + "/queue_data_new.txt";
}
? ? ? ? ?? 進(jìn)行GC操作
執(zhí)行消息數(shù)據(jù)文件的垃圾回收操作,使用復(fù)制算法完成,主要分為以下幾步:
? ? ? ? (1)創(chuàng)建一個(gè)新的文件queue_data_new.txt
? ? ? ? (2)從舊文件中讀取出所有的有效消息對象
? ? ? ? (3)把有效消息寫入到queue_data_new.txt
? ? ? ? (4)刪出舊的文件,并把新的文件重命名(queue_data_new.txt => queue_data.txt)
? ? ? ? (5)更新統(tǒng)計(jì)文件
public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
// 進(jìn)行g(shù)c的時(shí)候,是針對消息數(shù)據(jù)文件作出整體性的一個(gè)操作,在這個(gè)過程中,
// 進(jìn)行加鎖操作,讓其他線程不能對該隊(duì)列的消息文件作出任何修改
synchronized (queue) {
// 由于 gc 操作可能比較耗時(shí), 此處統(tǒng)計(jì)一下執(zhí)行消耗的時(shí)間.
long gcBegin = System.currentTimeMillis();
// 1. 創(chuàng)建一個(gè)新的文件
File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
if (queueDataNewFile.exists()) {
// 正常情況下, 這個(gè)文件不應(yīng)該存在. 如果存在, 就是意外~~ 說明上次 gc 了一半, 程序意外崩潰了.
throw new MqException("[MessageFileManager] gc 時(shí)發(fā)現(xiàn)該隊(duì)列的 queue_data_new 已經(jīng)存在! queueName=" + queue.getName());
}
boolean ok = queueDataNewFile.createNewFile();
if (!ok) {
throw new MqException("[MessageFileManager] 創(chuàng)建文件失敗! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());
}
// 2. 從舊的文件中, 讀取出所有的有效消息對象了. (這個(gè)邏輯直接調(diào)用上述方法即可, 不必重新寫了)
LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());
// 3. 把有效消息, 寫入到新的文件中.
try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
for (Message message : messages) {
byte[] buffer = BinaryTool.toBytes(message);
// 先寫四個(gè)字節(jié)消息的長度
dataOutputStream.writeInt(buffer.length);
dataOutputStream.write(buffer);
}
}
}
// 4. 刪除舊的數(shù)據(jù)文件, 并且把新的文件進(jìn)行重命名
File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
ok = queueDataOldFile.delete();
if (!ok) {
throw new MqException("[MessageFileManager] 刪除舊的數(shù)據(jù)文件失敗! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
}
// 把 queue_data_new.txt => queue_data.txt
ok = queueDataNewFile.renameTo(queueDataOldFile);
if (!ok) {
throw new MqException("[MessageFileManager] 文件重命名失敗! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()
+ ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
}
// 5. 更新統(tǒng)計(jì)文件
Stat stat = readStat(queue.getName());
stat.totalCount = messages.size();
stat.validCount = messages.size();
writeStat(queue.getName(), stat);
long gcEnd = System.currentTimeMillis();
System.out.println("[MessageFileManager] gc 執(zhí)行完畢! queueName=" + queue.getName() + ", time="
+ (gcEnd - gcBegin) + "ms");
}
}
五、測試MessageFileManager類
?? 1、“準(zhǔn)備工作”和“收尾工作”
創(chuàng)建MessagefileManagerTests?
@SpringBootTest
public class MessageFileManagerTests {
private MessageFileManger messageFileManger = new MessageFileManger();
private static final String queueName1 = "testQueue1";
private static final String queueName2 = "testQueue2";
// 每個(gè)用例執(zhí)行之前的準(zhǔn)備工作
@BeforeEach
public void setUp() throws IOException {
// 準(zhǔn)備階段,創(chuàng)建出兩個(gè)隊(duì)列,以備后用
messageFileManger.createQueueFiles(queueName1);
messageFileManger.createQueueFiles(queueName2);
}
// 每個(gè)用例執(zhí)行之后的收尾工作
@AfterEach
public void tearDown() throws IOException {
// 收尾階段,把創(chuàng)建出的隊(duì)列銷毀掉
messageFileManger.destroyQueueFiles(queueName1);
messageFileManger.destroyQueueFiles(queueName2);
}
}
?? 2、測試創(chuàng)建文件是否存在
@Test
public void testCreateFiles(){
// 創(chuàng)建隊(duì)列文件在準(zhǔn)備工作已經(jīng)執(zhí)行過了,這里主要是為了驗(yàn)證文件是否存在
File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");
// assertEquals(預(yù)期值,實(shí)際值)
Assertions.assertEquals(true,queueDataFile1.isFile());
File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");
Assertions.assertEquals(true,queueStatFile1.isFile());
File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");
Assertions.assertEquals(true,queueDataFile2.isFile());
File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");
Assertions.assertEquals(true,queueStatFile2.isFile());
}
這里為了方便查看文件是否創(chuàng)建,就把收尾工作注釋掉了
?
??? 3、測試writetStat和readStat是否能夠通過
@Test
public void testReadWriteStat(){
MessageFileManger.Stat stat = new MessageFileManger.Stat();
stat.totalCount = 100;
stat.validCount =50;
// 由于writeStat和readStat是私有方法,此處就需要使用反射的方式
// 使用Spring封裝好的反射的工具類
ReflectionTestUtils.invokeMethod(messageFileManger,"writeStat", queueName1,stat);
// 寫入完畢之后,調(diào)用讀取,驗(yàn)證讀取的結(jié)果和寫入的數(shù)據(jù)是一致的
MessageFileManger.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManger,"readStat",queueName1);
Assertions.assertEquals(100,newStat.totalCount);
Assertions.assertEquals(50,newStat.validCount);
System.out.println("writetStat和readStat測試通過");
}
??? 4、測試sendMessage
構(gòu)造創(chuàng)建queue和message的方法:
private MSGQueue createTestQueue(String queueName){
MSGQueue queue = new MSGQueue();
queue.setName(queueName);
queue.setDurable(true); //是否要持久化
return queue;
}
// 構(gòu)造出一條消息
private Message createTestMessage(String content){
Message message = Message.createMessageWithId("testRoutingKey",null,content.getBytes());
return message;
}
測試sendMessage:
@Test
public void testSendMessage() throws IOException, MqException, ClassNotFoundException {
// 構(gòu)造出消息,并且構(gòu)造出隊(duì)列
Message message = createTestMessage("testMessage");
// 創(chuàng)建queue對象
MSGQueue queue = createTestQueue(queueName1);
// 調(diào)用發(fā)送消息的方法
messageFileManger.sendMessage(queue,message);
// 檢查stat文件
MessageFileManger.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManger,"readStat",queueName1);
Assertions.assertEquals(1,stat.totalCount);
Assertions.assertEquals(1,stat.validCount);
// 檢查文件,把消息讀出來
LinkedList<Message> messages = messageFileManger.loadAllMessageFromQueue(queueName1);
Assertions.assertEquals(1,messages.size());
Message curMessage = messages.get(0);
Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());
Assertions.assertEquals(message.getDeliverMode(),curMessage.getDeliverMode());
// 比較兩個(gè)字節(jié)數(shù)組的內(nèi)容是否相同,不能直接使用asserEquals
Assertions.assertArrayEquals(message.getBody(),curMessage.getBody());
System.out.println("message = "+ curMessage);
}
?
構(gòu)造100條消息,?并且讀取出來
@Test
public void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {
// 往隊(duì)列中插入100條消息,驗(yàn)證100條消息從文件中讀取之后,是否和最初是一致的
MSGQueue queue = createTestQueue(queueName1);
List<Message> expectedMessages = new LinkedList<>();
for (int i = 0; i < 100; i++) {
Message message = createTestMessage("testMessge" + 1);
messageFileManger.sendMessage(queue,message);
expectedMessages.add(message);
}
// 讀取所有消息
LinkedList<Message> actualMessages = messageFileManger.loadAllMessageFromQueue(queueName1);
Assertions.assertEquals(expectedMessages.size(),actualMessages.size());
for (int i = 0; i < expectedMessages.size(); i++) {
Message expectedMessage = expectedMessages.get(i);
Message actualMessage = actualMessages.get(i);
System.out.println("[" + i + "]actualMessage = " + actualMessages);
Assertions.assertEquals(expectedMessage.getMessageId(),actualMessage.getMessageId());
Assertions.assertEquals(expectedMessage.getRoutingKey(),actualMessage.getRoutingKey());
Assertions.assertEquals(expectedMessage.getDeliverMode(),actualMessage.getDeliverMode());
Assertions.assertArrayEquals(expectedMessage.getBody(),actualMessage.getBody());
Assertions.assertEquals(0x1,actualMessage.getIsValid());
}
}
?
?? 5、測試刪除消息
// 測試刪除消息
@Test
public void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {
// 創(chuàng)建隊(duì)列, 寫入 10 個(gè)消息. 刪除其中的幾個(gè)消息. 再把所有消息讀取出來, 判定是否符合預(yù)期.
MSGQueue queue = createTestQueue(queueName1);
List<Message> expectedMessages = new LinkedList<>();
for (int i = 0; i < 10; i++) {
Message message = createTestMessage("testMessage" + i);
messageFileManager.sendMessage(queue, message);
expectedMessages.add(message);
}
// 刪除其中的三個(gè)消息
messageFileManager.deleteMessage(queue, expectedMessages.get(7));
messageFileManager.deleteMessage(queue, expectedMessages.get(8));
messageFileManager.deleteMessage(queue, expectedMessages.get(9));
// 對比這里的內(nèi)容是否正確.
LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
Assertions.assertEquals(7, actualMessages.size());
for (int i = 0; i < actualMessages.size(); i++) {
Message expectedMessage = expectedMessages.get(i);
Message actualMessage = actualMessages.get(i);
System.out.println("[" + i + "] actualMessage=" + actualMessage);
Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
Assertions.assertEquals(0x1, actualMessage.getIsValid());
}
}
?? 6、測試?yán)厥?/h3>
@Test
public void testGC() throws IOException, MqException, ClassNotFoundException {
// 先往隊(duì)列中寫 100 個(gè)消息. 獲取到文件大小.
// 再把 100 個(gè)消息中的一半, 都給刪除掉(比如把下標(biāo)為偶數(shù)的消息都刪除)
// 再手動調(diào)用 gc 方法, 檢測得到的新的文件的大小是否比之前縮小了.
MSGQueue queue = createTestQueue(queueName1);
List<Message> expectedMessages = new LinkedList<>();
for (int i = 0; i < 100; i++) {
Message message = createTestMessage("testMessage" + i);
messageFileManager.sendMessage(queue, message);
expectedMessages.add(message);
}
// 獲取 gc 前的文件大小
File beforeGCFile = new File("./data/" + queueName1 + "/queue_data.txt");
long beforeGCLength = beforeGCFile.length();
// 刪除偶數(shù)下標(biāo)的消息
for (int i = 0; i < 100; i += 2) {
messageFileManager.deleteMessage(queue, expectedMessages.get(i));
}
// 手動調(diào)用 gc
messageFileManager.gc(queue);
// 重新讀取文件, 驗(yàn)證新的文件的內(nèi)容是不是和之前的內(nèi)容匹配
LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
Assertions.assertEquals(50, actualMessages.size());
for (int i = 0; i < actualMessages.size(); i++) {
// 把之前消息偶數(shù)下標(biāo)的刪了, 剩下的就是奇數(shù)下標(biāo)的元素了.
// actual 中的 0 對應(yīng) expected 的 1
// actual 中的 1 對應(yīng) expected 的 3
// actual 中的 2 對應(yīng) expected 的 5
// actual 中的 i 對應(yīng) expected 的 2 * i + 1
Message expectedMessage = expectedMessages.get(2 * i + 1);
Message actualMessage = actualMessages.get(i);
Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
Assertions.assertEquals(0x1, actualMessage.getIsValid());
}
// 獲取新的文件的大小
File afterGCFile = new File("./data/" + queueName1 + "/queue_data.txt");
long afterGCLength = afterGCFile.length();
System.out.println("before: " + beforeGCLength);
System.out.println("after: " + afterGCLength);
Assertions.assertTrue(beforeGCLength > afterGCLength);
}
@Test
public void testGC() throws IOException, MqException, ClassNotFoundException {
// 先往隊(duì)列中寫 100 個(gè)消息. 獲取到文件大小.
// 再把 100 個(gè)消息中的一半, 都給刪除掉(比如把下標(biāo)為偶數(shù)的消息都刪除)
// 再手動調(diào)用 gc 方法, 檢測得到的新的文件的大小是否比之前縮小了.
MSGQueue queue = createTestQueue(queueName1);
List<Message> expectedMessages = new LinkedList<>();
for (int i = 0; i < 100; i++) {
Message message = createTestMessage("testMessage" + i);
messageFileManager.sendMessage(queue, message);
expectedMessages.add(message);
}
// 獲取 gc 前的文件大小
File beforeGCFile = new File("./data/" + queueName1 + "/queue_data.txt");
long beforeGCLength = beforeGCFile.length();
// 刪除偶數(shù)下標(biāo)的消息
for (int i = 0; i < 100; i += 2) {
messageFileManager.deleteMessage(queue, expectedMessages.get(i));
}
// 手動調(diào)用 gc
messageFileManager.gc(queue);
// 重新讀取文件, 驗(yàn)證新的文件的內(nèi)容是不是和之前的內(nèi)容匹配
LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
Assertions.assertEquals(50, actualMessages.size());
for (int i = 0; i < actualMessages.size(); i++) {
// 把之前消息偶數(shù)下標(biāo)的刪了, 剩下的就是奇數(shù)下標(biāo)的元素了.
// actual 中的 0 對應(yīng) expected 的 1
// actual 中的 1 對應(yīng) expected 的 3
// actual 中的 2 對應(yīng) expected 的 5
// actual 中的 i 對應(yīng) expected 的 2 * i + 1
Message expectedMessage = expectedMessages.get(2 * i + 1);
Message actualMessage = actualMessages.get(i);
Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
Assertions.assertEquals(0x1, actualMessage.getIsValid());
}
// 獲取新的文件的大小
File afterGCFile = new File("./data/" + queueName1 + "/queue_data.txt");
long afterGCLength = afterGCFile.length();
System.out.println("before: " + beforeGCLength);
System.out.println("after: " + afterGCLength);
Assertions.assertTrue(beforeGCLength > afterGCLength);
}
六、小結(jié)
MessageFileManager主要是負(fù)責(zé)管理消息在文件中的存儲:
? ? ? ? (1)設(shè)計(jì)了目錄結(jié)構(gòu)和文件格式
? ? ? ? (2)實(shí)現(xiàn)了目錄創(chuàng)建和刪除
? ? ? ? (3)實(shí)現(xiàn)了統(tǒng)計(jì)文件的讀寫
? ? ? ? (4)實(shí)現(xiàn)了消息的寫入
? ? ? ? (5)實(shí)現(xiàn)了消息的刪除
????????(6)實(shí)現(xiàn)了加載所有消息文章來源:http://www.zghlxwxcb.cn/news/detail-634917.html
????????(7)垃圾回收文章來源地址http://www.zghlxwxcb.cn/news/detail-634917.html
到了這里,關(guān)于項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化}的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!