国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化}

這篇具有很好參考價(jià)值的文章主要介紹了項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化}。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

目錄

?一、消息存儲格式設(shè)計(jì)

? ? ? ??? 1、queue_data.txt:保存消息的內(nèi)容

? ? ? ? ?? 2、queue_stat.txt:保存消息的統(tǒng)計(jì)信息

二、消息序列化

三、自定義異常類

四、創(chuàng)建MessageFileManger類

?? 1、約定消息文件所在的目錄和文件名字

??? 2、隊(duì)列的統(tǒng)計(jì)信息

?? 3、創(chuàng)建隊(duì)列對應(yīng)的目錄和功能

?? 4、實(shí)現(xiàn)刪除隊(duì)列的文件和目錄

?? 5、檢查隊(duì)列的目錄和文件是否都存在

?? 6、把消息寫入到文件中

?? 7、刪除文件中的消息

?? 8、將硬盤中的數(shù)據(jù)加載到內(nèi)存中

?? 9、實(shí)現(xiàn)消息文件的垃圾回收

?????????? 檢測是否要進(jìn)行GC

? ? ? ? ?? 構(gòu)造新目錄

? ? ? ? ?? 進(jìn)行GC操作

五、測試MessageFileManager類

?? 1、“準(zhǔn)備工作”和“收尾工作”

?? 2、測試創(chuàng)建文件是否存在

??? 3、測試writetStat和readStat是否能夠通過

??? 4、測試sendMessage

?? 5、測試刪除消息

?? 6、測試?yán)厥?/p>

六、小結(jié)


項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目

?一、消息存儲格式設(shè)計(jì)

對于消息,并不打算存儲在數(shù)據(jù)庫中:

????????(1)消息操作并不會涉及到復(fù)雜的增刪改查

????????(2)消息的數(shù)量可能會非常多,數(shù)據(jù)庫的訪問效率并不高

?所以,我們直接把消息存儲在文件中。

那么消息要如何在文件中存儲呢?

首先消息,它是依附于隊(duì)列的,所以在存儲的時(shí)候,就把消息按照隊(duì)列的維度展開。

我們會將隊(duì)列存儲在和數(shù)據(jù)庫同級的data目錄中,在data中創(chuàng)建一些子目錄,子目錄的名字就是隊(duì)列名。項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目

然后在每個(gè)隊(duì)列的子目錄下面,再分配兩個(gè)文件,用來存儲消息。主要是以下兩個(gè)文件。


? ? ? ??? 1、queue_data.txt:保存消息的內(nèi)容

????????Message是一個(gè)二進(jìn)制格式的文件,包含若干個(gè)消息,每個(gè)消息都以二進(jìn)制的方式存儲,每個(gè)消息都由這幾個(gè)部分構(gòu)成:Message對象序列化之后。

? ??????Messag對象,會分別再內(nèi)存和硬盤上都記錄一份。內(nèi)存中的衣服呢會記錄offsetBegin和offsetEnd。這樣就可以找到內(nèi)存中的Message對象,能夠找到對應(yīng)的硬盤上的message對象。

項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目

????????關(guān)于isValid:是用來標(biāo)識當(dāng)前消息在文件中是否有效,為1就是有效的消息,為0就是無效的消息。當(dāng)為0時(shí)就相當(dāng)于邏輯上的刪除消息功能。但是,隨著時(shí)間的推移,消息的增多,那么該消息可能就會大部分都是無效的消息,針對這種情況,就需要對當(dāng)前消息的文件,進(jìn)行垃圾回收。

以下是本程序中實(shí)現(xiàn)的垃圾回收功能:? ?

? 垃圾回收(GC):使用復(fù)制算法,針對消息數(shù)據(jù)文件中的垃圾回收進(jìn)行回收。直接遍歷原有的消息數(shù)據(jù)文件,把有效的數(shù)據(jù)拷貝到一個(gè)新的文件中,然后把之前的舊文件都刪除掉(邏輯刪除)。

作出約定(可以不按這個(gè)來),當(dāng)總消息數(shù)目超過了2000,并且有效的消息數(shù)目低于總數(shù)目的50%,就出發(fā)一次垃圾回收。

?對于RabbitMQ解決垃圾回收的方式如下:

?如果某個(gè)消息隊(duì)列中,消息很多,都是有效消息,就會導(dǎo)致整個(gè)消息的數(shù)據(jù)文件特別答,后續(xù)針對文件的各種操作,成本就會很高。RabbitMQ解決方案如下:

? ? 文件拆分:當(dāng)單個(gè)文件長度達(dá)到一定閾值以后,就會拆分成兩個(gè)文件(拆分次數(shù)越多文件就越多)

? ? 文件合并:每個(gè)單獨(dú)的文件都會進(jìn)行GC,如果GC之后,就會發(fā)現(xiàn)文件變小很多,當(dāng)小到一定程度,就會和其他文件合并。

這樣就會再消息特別多的時(shí)候,也能保證性能上的及時(shí)響應(yīng)。


? ? ? ? ?? 2、queue_stat.txt:保存消息的統(tǒng)計(jì)信息

????????使用這個(gè)文件,來保存消息的統(tǒng)計(jì)信息。

????????只存一行數(shù)據(jù),文本格式。然后一行包括兩列:兩者使用 \t 來分割,形如2000\t500

? ? ? ? ????????第一列是queue_data.txt中總的消息的數(shù)目

? ? ? ????????? 第二列是queue_data.txt中有效消息的數(shù)目。


二、消息序列化

消息序列化:把一個(gè)對象(結(jié)構(gòu)化數(shù)據(jù))轉(zhuǎn)成一個(gè)字符串/字節(jié)數(shù)組。

在序列化之后,對象的信息是不會丟失的,這樣就會方便與存儲和傳輸(在文件中存儲時(shí),只能以字符串/二進(jìn)制數(shù)據(jù)的方式存儲對象)。后面需要用的時(shí)候,就再反序列化。

由于消息的body是二進(jìn)制數(shù)據(jù),所以這里不會使用JSON進(jìn)行序列化。

針對二進(jìn)制序列化,有很多解決方案:????????

? ? ? ? (1)Java標(biāo)準(zhǔn)庫中提供了序列化的方案:ObjectInputStream 和?ObjectOutputStream

? ? ? ? (2)Hession

? ? ? ? (3)protobuffer

? ? ? ? (4)thrift

這里使用第一種,這樣就不用再引入額外的依賴。

在commen中創(chuàng)建一個(gè)BinaryTool,因?yàn)楹竺婵蛻舳诉€會用到這個(gè)類,所以放在公共的包中。

項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目


/*
* 序列化和反序列化
* 實(shí)現(xiàn)Serializable接口才能讓這個(gè)對象進(jìn)行序列化和反序列化
* */
public class BinaryTool {
    //    把一個(gè)對象序列化成一個(gè)字節(jié)數(shù)組
    public static byte[] toBytes(Object object) throws IOException {
//        這個(gè)流對象相當(dāng)于一個(gè)邊長的字節(jié)數(shù)組
//        就可以把object序列化的數(shù)組逐漸寫入到byteArrayOutputStream中,然后統(tǒng)一轉(zhuǎn)成byte[]
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
            try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
//                此處的writeObject就會把該對象進(jìn)行序列化,生成二進(jìn)制字節(jié)數(shù)據(jù),就會寫入到ObjectOutputStream中
//                由于ObjectOutputStream又關(guān)聯(lián)到了ByteArrayOutputStream,最終結(jié)果就寫入到ByteArrayOutputStream里面了
                objectOutputStream.writeObject(object);
            }
//            該操作就是把byteArrayOutputStream中持有的二進(jìn)制數(shù)據(jù)取出來,轉(zhuǎn)成byte[]
            return byteArrayOutputStream.toByteArray();
        }
    }

    //  把一個(gè)字節(jié)數(shù)組,反序列化成一個(gè)對象
    public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
        Object object = null;
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
            try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
//                此處的readObject,就是從data的byte[]中讀取數(shù)據(jù)并且進(jìn)行反序列化
                object = objectInputStream.readObject();
            }
        }
        return object;
    }
}

三、自定義異常類

?項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目

自定義一個(gè)異常類,如果是mq的業(yè)務(wù)邏輯中出的異常,就拋出這個(gè)異常類

/*
*自定義異常類
*/
public class MqException extends Exception{
    public MqException(String reason){
        super(reason);
    }
}

四、創(chuàng)建MessageFileManger類

項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目

?對硬盤上的消息進(jìn)行管理的類。


?? 1、約定消息文件所在的目錄和文件名字

//這里的init()方法暫時(shí)不用,只是列在這,后面可能擴(kuò)展 
public void init(){
//        后續(xù)擴(kuò)展
    }

//    約定的那個(gè)消息文件所在的目錄和文件名

//    用來獲取到指定隊(duì)列對應(yīng)的消息文件所在的路徑
    private  String getQueueDir(String queueName){
        return "./data/" + queueName;
    }

//    用來獲取該隊(duì)列的消息數(shù)據(jù)文件路徑
    private String getQueueDataPath(String queueName){
        return getQueueDir(queueName) + "/queue_data.txt";
    }

//    用來獲取該隊(duì)列列的消息統(tǒng)計(jì)文件路徑
    private String getQueueStatPath(String queueName){
        return getQueueDir(queueName) + "/queue_stat.txt";
    }

??? 2、隊(duì)列的統(tǒng)計(jì)信息

定義一個(gè)內(nèi)部類,來表示該隊(duì)列的統(tǒng)計(jì)信息:

//    定義一個(gè)內(nèi)部類,來表示該隊(duì)列的統(tǒng)計(jì)信息
    static public class Stat{
//        直接定義成public
        public int totalCount; //總消息數(shù)量
        public int validCount;//有效消息的數(shù)量
    }

然后實(shí)現(xiàn)消息統(tǒng)計(jì)的讀寫功能:

private Stat readStat(String queueName){
//        由于當(dāng)前的消息統(tǒng)計(jì)文件是文本文件,直接使用scanner讀取文件內(nèi)容
        Stat stat = new Stat();
        try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {
            Scanner scanner = new Scanner(inputStream);
            stat.totalCount = scanner.nextInt();
            stat.validCount = scanner.nextInt();
            return stat;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    private void writeStat(String queueName,Stat stat){
//      使用PrinWrite來寫
//        OutputStream打開文件,默認(rèn)情況下,會直接把源文件清空,新數(shù)據(jù)會覆蓋原數(shù)據(jù)
//        這里直接覆蓋就可以了
        try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {
            PrintWriter printWriter = new PrintWriter(outputStream);
            printWriter.write(stat.totalCount + "\t" + stat.validCount);
            printWriter.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

?? 3、創(chuàng)建隊(duì)列對應(yīng)的目錄和功能

//    創(chuàng)建隊(duì)列對應(yīng)的文件和目錄
    public void createQueueFiles(String queueName) throws IOException {
//        1.創(chuàng)建隊(duì)列對應(yīng)的消息目錄
        File baseDir = new File(getQueueDir(queueName));
        if (!baseDir.exists()){
//            不存在,就創(chuàng)建這個(gè)目錄
            boolean ok = baseDir.mkdirs();
            if (!ok){
                throw  new IOException("創(chuàng)建目錄失??!baseDir = " + baseDir.getAbsolutePath());
            }
        }

//        2.創(chuàng)建隊(duì)列數(shù)據(jù)文件
        File queueDataFile = new File(getQueueDataPath(queueName));
        if (!queueDataFile.exists()){
            boolean ok = queueDataFile.createNewFile();
            if (!ok){
                throw new IOException("創(chuàng)建文件失??!queueDataFile = " + queueDataFile.getAbsolutePath());
            }
        }
//       3.創(chuàng)建消息統(tǒng)計(jì)文件
        File queueStatFile = new File(getQueueStatPath(queueName));
        if (!queueStatFile.exists()){
            boolean ok = queueStatFile.createNewFile();
            if (!ok){
                throw new IOException("創(chuàng)建文件失?。ueueStatFile = " + queueStatFile.getAbsolutePath());
            }
        }

//        4.給消息統(tǒng)計(jì)文件,設(shè)定初始值 0\t0
        Stat stat = new Stat();
        stat.totalCount = 0;
        stat.validCount = 0;
        writeStat(queueName,stat);
    }

?? 4、實(shí)現(xiàn)刪除隊(duì)列的文件和目錄

//    實(shí)現(xiàn)刪除隊(duì)列的文件和目錄
    public void destroyQueueFiles(String queueName) throws IOException {
//        先刪除文件,在刪除目錄
        File queueDataFile = new File(getQueueDataPath(queueName));
        boolean ok1 = queueDataFile.delete();
        File queueStatFile = new File(getQueueStatPath(queueName));
        boolean ok2 = queueStatFile.delete();
        File baseDir = new File(getQueueDir(queueName));
        boolean ok3 = baseDir.delete();
        if (!ok1 || !ok2 || !ok3){
//            有任意一個(gè)刪除失敗,就算整體刪除失敗
            throw new IOException("刪除隊(duì)列目錄和文件失??!baseDir = " + baseDir.getAbsolutePath());
        }
    }

?? 5、檢查隊(duì)列的目錄和文件是否都存在

后續(xù)有生產(chǎn)者給brocker server生產(chǎn)消息了,這個(gè)消息就可能需要記錄到文件上(取決消息是否要持久化)。但是判斷是否要持久化之前,需要檢查隊(duì)列中的文件是否存在。

//    檢查隊(duì)列的目錄和文件是否存在
    public boolean checkFileExists(String queueName){
//        判斷隊(duì)列的數(shù)據(jù)文件和統(tǒng)計(jì)文件是否都存在
        File queueDataFile = new File(getQueueDataPath(queueName));
        if (!queueDataFile.exists()){
            return false;
        }
        File queueStatFile = new File(getQueueStatPath(queueName));
        if (!queueStatFile.exists()){
            return false;
        }
        return true;
    }

?? 6、把消息寫入到文件中

把一個(gè)新的消息,放到隊(duì)列對應(yīng)的文件中。主要包含以下三步:

(1)檢查寫入的隊(duì)列是否存在;

(2)把Message對象進(jìn)行序列化,轉(zhuǎn)成二進(jìn)制的字節(jié)數(shù)組;

(3)獲取到當(dāng)前數(shù)據(jù)的文件長度,使用[offsetBegin,offsetEnd]。把新的Message數(shù)據(jù),寫入到隊(duì)列數(shù)據(jù)文件的末尾,此時(shí),

? ?Message對象的offsetBegin,就是當(dāng)前文件長度 +4

???offsetEnd就是當(dāng)前文件長度 + 4 + message自身長度

項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目

(4)寫入消息到數(shù)據(jù)文件,追加到數(shù)據(jù)文件末尾

(5)更新消息統(tǒng)計(jì)文件

寫入文件時(shí)的線程安全問題:

* 如果兩個(gè)線程,是往同一個(gè)隊(duì)列中寫消息,此時(shí)就需要阻塞等待;

假設(shè)現(xiàn)在有兩個(gè)線程t1,t2。如果沒有加鎖,那么他們的目的就是將一個(gè)message寫入到104~124之間去。但是,此時(shí)可能就會導(dǎo)致t1計(jì)算長度以后,沒有進(jìn)行寫文件;t2就開始計(jì)算長度了,并且執(zhí)行了寫文件操作,寫完以后,t1才開始寫,但是此時(shí)t1就不是從104寫了,而是從124開始寫。這樣會導(dǎo)致queue_data多出一段。

?項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目

?所以這里我們就需要對隊(duì)列進(jìn)行加鎖。

* 如果兩個(gè)線程,需要往不同的隊(duì)列中些消息,此時(shí)就不需要阻塞等待。

總體代碼如下:

//    該方法用于把一個(gè)新的消息,放到隊(duì)列對應(yīng)的文件中
//    queue表示要把消息寫入的隊(duì)列,message則是要寫的消息
    public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
//        1、檢查要寫入的隊(duì)列是否存在
//        如果不存在
        if (!checkFileExists(queue.getName())){
            throw new MqException("[MessageFileManager] 隊(duì)列對應(yīng)的文件不存在!queueName = " + queue.getName());
        }

//        2、把Message對象進(jìn)行序列化,轉(zhuǎn)成二進(jìn)制的字節(jié)數(shù)組
        byte[] messageBinary = BinaryTool.toBytes(message);

//        這個(gè)鎖是,當(dāng)有兩個(gè)對象針對同一個(gè)對象操作時(shí),鎖才會有效
        synchronized (queue){
//        3、先獲取到當(dāng)前的隊(duì)列數(shù)據(jù)文件長度,使用[offsetBegin,offsetEnd]
//        把新的Message數(shù)據(jù),寫入到隊(duì)列數(shù)據(jù)文件的末尾,此時(shí)Message對象的offsetBegin,就是當(dāng)前文件長度+4
//        offsetEnd就是當(dāng)前文件長度 + 4 + message自身長度
            File queueDataFile = new File(getQueueDataPath(queue.getName()));
//        獲取到文件的長度:queueDataFile.length();單位字節(jié)
            message.setOffsetBegin(queueDataFile.length() + 4);
            message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);

//        4.寫入消息到數(shù)據(jù)文件,追加到數(shù)據(jù)文件末尾
            try(OutputStream outputStream = new FileOutputStream(queueDataFile,true)){
                try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
//                先寫當(dāng)前消息的長度,占據(jù)4個(gè)字節(jié)
//                writeInt()方法用于將給定的整數(shù)值作為4個(gè)字節(jié)(即32位)寫入基本DataOutputStream,并且成功執(zhí)行時(shí)變量計(jì)數(shù)器加4。
                    dataOutputStream.writeInt(messageBinary.length);
//                寫入消息本體
                    dataOutputStream.write(messageBinary);
                }
            }

//        5.更新消息統(tǒng)計(jì)文件
            Stat stat = readStat(queue.getName());
            stat.totalCount += 1;
            stat.validCount += 1;
            writeStat(queue.getName(),stat);
        }
    }

?? 7、刪除文件中的消息

這里就是邏輯刪除:將isValid設(shè)置為0.

主要分為3步:

? ? ? ? (1)把文件中需要刪除的一段數(shù)據(jù)讀出來,

? ? ? ? (2)還原回Message對象(反序列化);

? ? ? ? (3)把isValid改為0;

? ? ? ? (4)將上面的數(shù)據(jù)又寫回到文件。

? ? ? ? (5)更新統(tǒng)計(jì)文件

這里的message對象,必須要包含offsetBegin和offsetEnd。因?yàn)檫@里是對文件中指定的位置進(jìn)行讀寫的(把這個(gè)隨機(jī)訪問)。隨機(jī)訪問用到的類RandomAcessFile。

關(guān)于RandomAcessFile.seek()是用于設(shè)置文件指針(相當(dāng)于光標(biāo))位置,設(shè)置后,光標(biāo)會從當(dāng)前指針的下一位讀取到或?qū)懭氲健?

//    刪除消息的方法
    public void deleteMessage(MSGQueue queue,Message message) throws IOException,ClassNotFoundException{
        synchronized (queue){
            try(RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()),"rw")){
//            1.先從文件中讀取對應(yīng)的Message數(shù)據(jù)
                byte[] bufferSrc = new byte[(int)(message.getOffsetEnd() - message.getOffsetBegin())];
                randomAccessFile.seek(message.getOffsetBegin());
                randomAccessFile.read(bufferSrc);
//            2.把當(dāng)前讀出來的二進(jìn)制數(shù)據(jù),轉(zhuǎn)回成Message對象
                Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
//            3.把isValid設(shè)置為無效
                diskMessage.setIsValid((byte) 0x0);
//            4.重新寫入文件
                byte[] buffserDest = BinaryTool.toBytes(diskMessage);
//        這里還需要設(shè)置光標(biāo)的位置,因?yàn)?上面的光標(biāo)已經(jīng)隨著讀出數(shù)據(jù)而發(fā)生了改變,已經(jīng)走到了下一條message的offsetBegin,
//            這里為了重新寫入數(shù)據(jù)到文件中,就需要將光標(biāo)移到對應(yīng)的位置上面
                randomAccessFile.seek(message.getOffsetBegin());
                randomAccessFile.write(buffserDest);
//            5.統(tǒng)計(jì)文件-1
//            因?yàn)橛幸粭l數(shù)據(jù)無效了
                Stat stat = readStat(queue.getName());
                if (stat.validCount > 0){
                    stat.validCount -= 1;
                }
                writeStat(queue.getName(),stat);
            }
        }
    }

?? 8、將硬盤中的數(shù)據(jù)加載到內(nèi)存中

將數(shù)據(jù)從文件中,讀取出所有的消息內(nèi)容,加載到內(nèi)存當(dāng)中(放到一個(gè)鏈表中),這個(gè)方法會在程序啟動的時(shí)候調(diào)用,主要又以下幾步

????????1.讀取當(dāng)前消息的長度;

? ? ? ? 2.按照該長度,讀取消息內(nèi)容;

? ? ? ? 3.將讀取到的二進(jìn)制數(shù)據(jù),反序列化回Message對象

? ? ? ? 4.判斷消息對象是不是無效對象

? ? ? ? 4.將有效的Message對象插入到鏈表中

//   將數(shù)據(jù)從文件中,讀取出所有的消息內(nèi)容,加載到內(nèi)存當(dāng)中(放到一個(gè)鏈表中)
//    由于該方法實(shí)在程序啟動時(shí)調(diào)用, 此時(shí)服務(wù)器還不能處理請求,不涉及線程操作文件.
    public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException,MqException,ClassNotFoundException {
        LinkedList<Message> messages = new LinkedList<>();
        try(InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){
            try(DataInputStream dataInputStream = new DataInputStream(inputStream)){
//               該變量記錄當(dāng)前文件光標(biāo)位置,初始位置為0
                long currentOffset = 0;
//                一個(gè)文件中包含了很多消息,這里要循環(huán)讀取
                while (true){
//                    1.讀取當(dāng)前消息的長度,這里可能會讀到文件末尾
//                    reaIn()方法讀到文件末尾,會拋出EOFException異常
//                    readInt()讀取出4個(gè)字節(jié)
                    int messageSize = dataInputStream.readInt();
//                    2.按照這個(gè)長度,讀取消息內(nèi)容
//                    buffer是一個(gè)盛放消息容器,和消息的長度一般大小
                    byte[] buffer = new byte[messageSize];
                    int actualSize = dataInputStream.read(buffer);
                    if (messageSize != actualSize){
//                        如果不匹配,說明文件有問題,格式錯(cuò)亂了
                        throw new MqException("[MessageFileManager] 文件格式錯(cuò)誤!queueName = " + queueName);
                    }
//                    3.將讀取到的二進(jìn)制數(shù)據(jù),反序列化回Message對象
                    Message message = (Message) BinaryTool.fromBytes(buffer);
                    //                  4.判定這個(gè)消息對象是不是無效對象
                    if(message.getIsValid() != 0x1){
//                        無效數(shù)據(jù),直接跳過
//                        雖然消息是無效消息,但是offset要更新
                        currentOffset += (4 + messageSize);
                        continue;
                    }
//                    有效數(shù)據(jù),則需要把這個(gè)Message對象加入到鏈表中,加入前還需要填寫offsetBegin和offsetEnd;
//                    進(jìn)行offset的時(shí)候,需要知道當(dāng)前光標(biāo)的位置,由于當(dāng)下使用的DataInputStream并不方便計(jì)算光標(biāo)位置
//                    因此這里手動計(jì)算文件光標(biāo)位置
                    message.setOffsetBegin(currentOffset + 4);
                    message.setOffsetEnd(currentOffset + 4 + messageSize);
                    currentOffset += (4 + messageSize);
                    messages.add(message);
                }
            } catch (EOFException e){
                System.out.println("[MessageFileManager]恢復(fù)Message數(shù)據(jù)完成");
            }
        }
        return messages;
    }

?? 9、實(shí)現(xiàn)消息文件的垃圾回收

為什么要實(shí)現(xiàn)垃圾回收?(GC)

????????由于當(dāng)前會不停的往消息文件中寫入新消息,而且刪除也只是邏輯刪除(isValid),這樣就可能導(dǎo)致消息文件越來越大,并且里面又包含了大量的無效消息。

此處的垃圾回收,使用的是復(fù)制算法:

? ? ? ? 判斷,當(dāng)文件中消息總數(shù)超過了2000,并且有效消息的數(shù)目不足50%時(shí),就觸發(fā)垃圾回收。然后將文件中有效的消息復(fù)制出來,單獨(dú)寫入到一個(gè)新的文件中,刪除舊文件,使用新文件代替。

?????????? 檢測是否要進(jìn)行GC

檢查當(dāng)前是否要針對該隊(duì)列的消息數(shù)據(jù)文件進(jìn)行GC,判斷是否要GC,根據(jù)總消息數(shù)目和有效消息數(shù)目判斷。

//    檢查當(dāng)前是否要針對該隊(duì)列的消息數(shù)據(jù)文件進(jìn)行GC
    public boolean checkGC(String queueName){
//        判斷是否要GC,根據(jù)總消息數(shù)目和有效消息數(shù)目,這兩個(gè)值都是在消息統(tǒng)計(jì)文件中實(shí)現(xiàn)的。
        Stat stat = readStat(queueName);
        if (stat.totalCount > 2000 && (double)stat.validCount / (double) stat.totalCount < 0.5){
            return  true;
        }
        return false;
    }

? ? ? ? ?? 構(gòu)造新目錄

構(gòu)造一個(gè)新目錄,放置有效的復(fù)制信息。

//    構(gòu)造一個(gè)目錄結(jié)構(gòu)放置復(fù)制的信息
    private String getQueueDataNewPath(String queueName){
        return getQueueDir(queueName) + "/queue_data_new.txt";
    }

? ? ? ? ?? 進(jìn)行GC操作

執(zhí)行消息數(shù)據(jù)文件的垃圾回收操作,使用復(fù)制算法完成,主要分為以下幾步:

? ? ? ? (1)創(chuàng)建一個(gè)新的文件queue_data_new.txt

? ? ? ? (2)從舊文件中讀取出所有的有效消息對象

? ? ? ? (3)把有效消息寫入到queue_data_new.txt

? ? ? ? (4)刪出舊的文件,并把新的文件重命名(queue_data_new.txt => queue_data.txt)

? ? ? ? (5)更新統(tǒng)計(jì)文件

public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
//        進(jìn)行g(shù)c的時(shí)候,是針對消息數(shù)據(jù)文件作出整體性的一個(gè)操作,在這個(gè)過程中,
//        進(jìn)行加鎖操作,讓其他線程不能對該隊(duì)列的消息文件作出任何修改
        synchronized (queue) {
            // 由于 gc 操作可能比較耗時(shí), 此處統(tǒng)計(jì)一下執(zhí)行消耗的時(shí)間.
            long gcBegin = System.currentTimeMillis();

            // 1. 創(chuàng)建一個(gè)新的文件
            File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
            if (queueDataNewFile.exists()) {
                // 正常情況下, 這個(gè)文件不應(yīng)該存在. 如果存在, 就是意外~~ 說明上次 gc 了一半, 程序意外崩潰了.
                throw new MqException("[MessageFileManager] gc 時(shí)發(fā)現(xiàn)該隊(duì)列的 queue_data_new 已經(jīng)存在! queueName=" + queue.getName());
            }
            boolean ok = queueDataNewFile.createNewFile();
            if (!ok) {
                throw new MqException("[MessageFileManager] 創(chuàng)建文件失敗! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());
            }

            // 2. 從舊的文件中, 讀取出所有的有效消息對象了. (這個(gè)邏輯直接調(diào)用上述方法即可, 不必重新寫了)
            LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());

            // 3. 把有效消息, 寫入到新的文件中.
            try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {
                try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                    for (Message message : messages) {
                        byte[] buffer = BinaryTool.toBytes(message);
                        // 先寫四個(gè)字節(jié)消息的長度
                        dataOutputStream.writeInt(buffer.length);
                        dataOutputStream.write(buffer);
                    }
                }
            }

            // 4. 刪除舊的數(shù)據(jù)文件, 并且把新的文件進(jìn)行重命名
            File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
            ok = queueDataOldFile.delete();
            if (!ok) {
                throw new MqException("[MessageFileManager] 刪除舊的數(shù)據(jù)文件失敗! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
            }
            // 把 queue_data_new.txt => queue_data.txt
            ok = queueDataNewFile.renameTo(queueDataOldFile);
            if (!ok) {
                throw new MqException("[MessageFileManager] 文件重命名失敗! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()
                        + ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
            }

            // 5. 更新統(tǒng)計(jì)文件
            Stat stat = readStat(queue.getName());
            stat.totalCount = messages.size();
            stat.validCount = messages.size();
            writeStat(queue.getName(), stat);

            long gcEnd = System.currentTimeMillis();
            System.out.println("[MessageFileManager] gc 執(zhí)行完畢! queueName=" + queue.getName() + ", time="
                    + (gcEnd - gcBegin) + "ms");
        }
    }

五、測試MessageFileManager類


?? 1、“準(zhǔn)備工作”和“收尾工作”

創(chuàng)建MessagefileManagerTests?

項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目

@SpringBootTest
public class MessageFileManagerTests {
    private MessageFileManger messageFileManger = new MessageFileManger();

    private static final String queueName1 = "testQueue1";
    private static final String queueName2 = "testQueue2";

//  每個(gè)用例執(zhí)行之前的準(zhǔn)備工作
    @BeforeEach
    public void setUp() throws IOException {
//        準(zhǔn)備階段,創(chuàng)建出兩個(gè)隊(duì)列,以備后用
        messageFileManger.createQueueFiles(queueName1);
        messageFileManger.createQueueFiles(queueName2);

    }


//    每個(gè)用例執(zhí)行之后的收尾工作
    @AfterEach
    public void tearDown() throws IOException {
//        收尾階段,把創(chuàng)建出的隊(duì)列銷毀掉
        messageFileManger.destroyQueueFiles(queueName1);
        messageFileManger.destroyQueueFiles(queueName2);
    }
}

?? 2、測試創(chuàng)建文件是否存在

@Test
    public void testCreateFiles(){
//        創(chuàng)建隊(duì)列文件在準(zhǔn)備工作已經(jīng)執(zhí)行過了,這里主要是為了驗(yàn)證文件是否存在
        File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");
//        assertEquals(預(yù)期值,實(shí)際值)
        Assertions.assertEquals(true,queueDataFile1.isFile());
        File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");
        Assertions.assertEquals(true,queueStatFile1.isFile());

        File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");
        Assertions.assertEquals(true,queueDataFile2.isFile());
        File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");
        Assertions.assertEquals(true,queueStatFile2.isFile());
    }

項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目

這里為了方便查看文件是否創(chuàng)建,就把收尾工作注釋掉了

?項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目


??? 3、測試writetStat和readStat是否能夠通過

@Test
    public void testReadWriteStat(){
        MessageFileManger.Stat stat = new MessageFileManger.Stat();
        stat.totalCount = 100;
        stat.validCount =50;

//        由于writeStat和readStat是私有方法,此處就需要使用反射的方式
//        使用Spring封裝好的反射的工具類
        ReflectionTestUtils.invokeMethod(messageFileManger,"writeStat", queueName1,stat);

//        寫入完畢之后,調(diào)用讀取,驗(yàn)證讀取的結(jié)果和寫入的數(shù)據(jù)是一致的
        MessageFileManger.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManger,"readStat",queueName1);
        Assertions.assertEquals(100,newStat.totalCount);
        Assertions.assertEquals(50,newStat.validCount);

        System.out.println("writetStat和readStat測試通過");
    }

項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目


??? 4、測試sendMessage

構(gòu)造創(chuàng)建queue和message的方法:

 private MSGQueue createTestQueue(String queueName){
        MSGQueue queue = new MSGQueue();
        queue.setName(queueName);
        queue.setDurable(true);    //是否要持久化
        return queue;
    }

//    構(gòu)造出一條消息
    private Message createTestMessage(String content){
        Message message = Message.createMessageWithId("testRoutingKey",null,content.getBytes());
        return message;
    }

測試sendMessage:

@Test
    public void testSendMessage() throws IOException, MqException, ClassNotFoundException {
//        構(gòu)造出消息,并且構(gòu)造出隊(duì)列
        Message message = createTestMessage("testMessage");
//       創(chuàng)建queue對象
        MSGQueue queue = createTestQueue(queueName1);

//      調(diào)用發(fā)送消息的方法
        messageFileManger.sendMessage(queue,message);

//       檢查stat文件
        MessageFileManger.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManger,"readStat",queueName1);
        Assertions.assertEquals(1,stat.totalCount);
        Assertions.assertEquals(1,stat.validCount);

//        檢查文件,把消息讀出來
        LinkedList<Message> messages = messageFileManger.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(1,messages.size());
        Message curMessage = messages.get(0);
        Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());
        Assertions.assertEquals(message.getDeliverMode(),curMessage.getDeliverMode());

//      比較兩個(gè)字節(jié)數(shù)組的內(nèi)容是否相同,不能直接使用asserEquals
        Assertions.assertArrayEquals(message.getBody(),curMessage.getBody());

        System.out.println("message = "+ curMessage);
    }

?項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目

構(gòu)造100條消息,?并且讀取出來

 @Test
    public void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {
//      往隊(duì)列中插入100條消息,驗(yàn)證100條消息從文件中讀取之后,是否和最初是一致的
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 100; i++) {
            Message message = createTestMessage("testMessge" + 1);
            messageFileManger.sendMessage(queue,message);
            expectedMessages.add(message);
        }

//        讀取所有消息
        LinkedList<Message> actualMessages = messageFileManger.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(expectedMessages.size(),actualMessages.size());
        for (int i = 0; i < expectedMessages.size(); i++) {
            Message expectedMessage = expectedMessages.get(i);
            Message actualMessage = actualMessages.get(i);
            System.out.println("[" + i + "]actualMessage = " + actualMessages);

            Assertions.assertEquals(expectedMessage.getMessageId(),actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(),actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(),actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(),actualMessage.getBody());
            Assertions.assertEquals(0x1,actualMessage.getIsValid());
        }
    }

?項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目


?? 5、測試刪除消息

//    測試刪除消息
    @Test
    public void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {
        // 創(chuàng)建隊(duì)列, 寫入 10 個(gè)消息. 刪除其中的幾個(gè)消息. 再把所有消息讀取出來, 判定是否符合預(yù)期.
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 10; i++) {
            Message message = createTestMessage("testMessage" + i);
            messageFileManager.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 刪除其中的三個(gè)消息
        messageFileManager.deleteMessage(queue, expectedMessages.get(7));
        messageFileManager.deleteMessage(queue, expectedMessages.get(8));
        messageFileManager.deleteMessage(queue, expectedMessages.get(9));

        // 對比這里的內(nèi)容是否正確.
        LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(7, actualMessages.size());
        for (int i = 0; i < actualMessages.size(); i++) {
            Message expectedMessage = expectedMessages.get(i);
            Message actualMessage = actualMessages.get(i);
            System.out.println("[" + i + "] actualMessage=" + actualMessage);

            Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
            Assertions.assertEquals(0x1, actualMessage.getIsValid());
        }
    }

項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目

?? 6、測試?yán)厥?/h3>
 @Test
    public void testGC() throws IOException, MqException, ClassNotFoundException {
        // 先往隊(duì)列中寫 100 個(gè)消息. 獲取到文件大小.
        // 再把 100 個(gè)消息中的一半, 都給刪除掉(比如把下標(biāo)為偶數(shù)的消息都刪除)
        // 再手動調(diào)用 gc 方法, 檢測得到的新的文件的大小是否比之前縮小了.
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 100; i++) {
            Message message = createTestMessage("testMessage" + i);
            messageFileManager.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 獲取 gc 前的文件大小
        File beforeGCFile = new File("./data/" + queueName1 + "/queue_data.txt");
        long beforeGCLength = beforeGCFile.length();

        // 刪除偶數(shù)下標(biāo)的消息
        for (int i = 0; i < 100; i += 2) {
            messageFileManager.deleteMessage(queue, expectedMessages.get(i));
        }

        // 手動調(diào)用 gc
        messageFileManager.gc(queue);

        // 重新讀取文件, 驗(yàn)證新的文件的內(nèi)容是不是和之前的內(nèi)容匹配
        LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(50, actualMessages.size());
        for (int i = 0; i < actualMessages.size(); i++) {
            // 把之前消息偶數(shù)下標(biāo)的刪了, 剩下的就是奇數(shù)下標(biāo)的元素了.
            // actual 中的 0 對應(yīng) expected 的 1
            // actual 中的 1 對應(yīng) expected 的 3
            // actual 中的 2 對應(yīng) expected 的 5
            // actual 中的 i 對應(yīng) expected 的 2 * i + 1
            Message expectedMessage = expectedMessages.get(2 * i + 1);
            Message actualMessage = actualMessages.get(i);

            Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
            Assertions.assertEquals(0x1, actualMessage.getIsValid());
        }
        // 獲取新的文件的大小
        File afterGCFile = new File("./data/" + queueName1 + "/queue_data.txt");
        long afterGCLength = afterGCFile.length();
        System.out.println("before: " + beforeGCLength);
        System.out.println("after: " + afterGCLength);
        Assertions.assertTrue(beforeGCLength > afterGCLength);
    }

項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化},項(xiàng)目,java,項(xiàng)目

六、小結(jié)

MessageFileManager主要是負(fù)責(zé)管理消息在文件中的存儲:

? ? ? ? (1)設(shè)計(jì)了目錄結(jié)構(gòu)和文件格式

? ? ? ? (2)實(shí)現(xiàn)了目錄創(chuàng)建和刪除

? ? ? ? (3)實(shí)現(xiàn)了統(tǒng)計(jì)文件的讀寫

? ? ? ? (4)實(shí)現(xiàn)了消息的寫入

? ? ? ? (5)實(shí)現(xiàn)了消息的刪除

????????(6)實(shí)現(xiàn)了加載所有消息

????????(7)垃圾回收文章來源地址http://www.zghlxwxcb.cn/news/detail-634917.html

到了這里,關(guān)于項(xiàng)目實(shí)戰(zhàn) — 消息隊(duì)列(4){消息持久化}的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊(duì)列,集群,交換機(jī),持久化,生產(chǎn)者、消費(fèi)者)

    RabbitMQ學(xué)習(xí)筆記(消息發(fā)布確認(rèn),死信隊(duì)列,集群,交換機(jī),持久化,生產(chǎn)者、消費(fèi)者)

    MQ(message queue):本質(zhì)上是個(gè)隊(duì)列,遵循FIFO原則,隊(duì)列中存放的是message,是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。MQ提供“邏輯解耦+物理解耦”的消息通信服務(wù)。使用了MQ之后消息發(fā)送上游只需要依賴MQ,不需要依賴其它服務(wù)。 功能1:流量消峰 功能2:應(yīng)用解耦 功

    2024年02月07日
    瀏覽(118)
  • 【RabbitMQ筆記08】消息隊(duì)列RabbitMQ之防止消息丟失的三種方式(生產(chǎn)者消息確認(rèn)、消費(fèi)者消息確認(rèn)、消息持久化)

    【RabbitMQ筆記08】消息隊(duì)列RabbitMQ之防止消息丟失的三種方式(生產(chǎn)者消息確認(rèn)、消費(fèi)者消息確認(rèn)、消息持久化)

    這篇文章,主要介紹消息隊(duì)列RabbitMQ之防止消息丟失的三種方式(生產(chǎn)者消息確認(rèn)、消費(fèi)者消息確認(rèn)、消息持久化)。 目錄 一、防止消息丟失 1.1、消息確認(rèn)機(jī)制(生產(chǎn)者) (1)生產(chǎn)者丟失消息 (2)生產(chǎn)者消息確認(rèn)機(jī)制 1.2、消息確認(rèn)機(jī)制(消費(fèi)者) (1)消費(fèi)者丟失消息

    2024年02月02日
    瀏覽(29)
  • uniapp項(xiàng)目實(shí)戰(zhàn)第五章:小程序Pinia持久化

    uniapp項(xiàng)目實(shí)戰(zhàn)第五章:小程序Pinia持久化

    說明:項(xiàng)目中 Pinia 用法平時(shí)完全一致,主要解決持久化插件 兼容性 問題。 持久化存儲插件 持久化存儲插件: pinia-plugin-persistedstate 插件默認(rèn)使用 localStorage 實(shí)現(xiàn)持久化,小程序端不兼容,需要替換持久化 API。 網(wǎng)頁端持久化 API 多端持久化 API 參考代碼 現(xiàn)在可以持續(xù)化了

    2024年02月01日
    瀏覽(23)
  • ActiveMQ使用JDBC持久化消息

    為了避免服務(wù)器宕機(jī)而導(dǎo)致消息丟失,ActiveMQ提供消息持久化機(jī)制。 ActiveMQ提供多種消息持久化的方式,如LevelDB Store、KahaDB 、AMQ、JDBC等,詳情可以訪問官網(wǎng)。 ActiveMQ默認(rèn)是使用KahaDB持久化消息。在/conf/activemq.xml如下配置: KahaDB是一個(gè)文件型數(shù)據(jù)庫,是以日志形式保存到文件

    2024年02月11日
    瀏覽(21)
  • RabbitMQ隊(duì)列持久化的重要性與意義

    持久化隊(duì)列的一個(gè)主要目的是確保數(shù)據(jù)的安全性。在RabbitMQ中,消息通常存儲在內(nèi)存中,以提高消息傳遞的速度。然而,如果隊(duì)列沒有持久化,一旦RabbitMQ服務(wù)器發(fā)生故障或者重啟,所有未被處理的消息都會丟失。這可能導(dǎo)致數(shù)據(jù)丟失,對于關(guān)鍵業(yè)務(wù)應(yīng)用程序來說是不可接受的

    2024年02月07日
    瀏覽(21)
  • RabbitMQ (HelloWord 消息應(yīng)答 持久化 不公平分發(fā) 預(yù)取值)

    RabbitMQ (HelloWord 消息應(yīng)答 持久化 不公平分發(fā) 預(yù)取值)

    在下圖中,“P”是我們的生產(chǎn)者,“C”是我們的消費(fèi)者。中間的框是一個(gè)隊(duì)列-RabbitMO.代表使用者保留的消息緩沖區(qū) 第一步:導(dǎo)入依賴 第二步:創(chuàng)建生產(chǎn)者 第三步:創(chuàng)建消費(fèi)者 因?yàn)槟銥榱舜_保同一條消息被其中一個(gè)工作線程接收到了之后,其它工作就不能消費(fèi)的到了 三者

    2023年04月14日
    瀏覽(24)
  • 根據(jù)源碼,模擬實(shí)現(xiàn) RabbitMQ - 實(shí)現(xiàn)消息持久化,統(tǒng)一硬盤操作(3)

    根據(jù)源碼,模擬實(shí)現(xiàn) RabbitMQ - 實(shí)現(xiàn)消息持久化,統(tǒng)一硬盤操作(3)

    目錄 一、實(shí)現(xiàn)消息持久化 1.1、消息的存儲設(shè)定 1.1.1、存儲方式 1.1.2、存儲格式約定 1.1.3、queue_data.txt 文件內(nèi)容 ?1.1.4、queue_stat.txt 文件內(nèi)容 1.2、實(shí)現(xiàn) MessageFileManager 類 1.2.1、設(shè)計(jì)目錄結(jié)構(gòu)和文件格式 1.2.2、實(shí)現(xiàn)消息的寫入 1.2.3、實(shí)現(xiàn)消息的刪除(隨機(jī)訪問文件) 1.2.4、獲取隊(duì)

    2024年02月12日
    瀏覽(23)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生產(chǎn)者和消費(fèi)者消息的確認(rèn),消息的持久化以及消費(fèi)失敗的重試機(jī)制

    【RabbitMQ】RabbitMQ 消息的可靠性 —— 生產(chǎn)者和消費(fèi)者消息的確認(rèn),消息的持久化以及消費(fèi)失敗的重試機(jī)制

    在現(xiàn)代分布式應(yīng)用程序中,消息隊(duì)列扮演了至關(guān)重要的角色,允許系統(tǒng)中的各個(gè)組件之間進(jìn)行異步通信。這種通信模式提供了高度的靈活性和可伸縮性,但也引入了一系列的挑戰(zhàn),其中最重要的之一是消息的可靠性。 首先讓我們來了解一下,在消息隊(duì)列中,消息從生產(chǎn)者發(fā)送

    2024年02月05日
    瀏覽(35)
  • 【爬蟲】實(shí)驗(yàn)項(xiàng)目二:模擬登錄和數(shù)據(jù)持久化

    【爬蟲】實(shí)驗(yàn)項(xiàng)目二:模擬登錄和數(shù)據(jù)持久化

    目錄 一、實(shí)驗(yàn)?zāi)康?二、實(shí)驗(yàn)預(yù)習(xí)提示 三、實(shí)驗(yàn)內(nèi)容 實(shí)驗(yàn)要求 基本要求: 改進(jìn)要求A: 改進(jìn)要求B: 四、實(shí)驗(yàn)過程 基本要求: 源碼如下: ?改進(jìn)要求A: 源碼如下: 改進(jìn)要求B: 源碼如下: 五、資料 1.實(shí)驗(yàn)框架代碼: 2.MySQL存儲: 3.實(shí)驗(yàn)小提示 ????????部分網(wǎng)站的數(shù)據(jù)需

    2024年02月10日
    瀏覽(14)
  • 鏈路追蹤SkyWalking整合項(xiàng)目以及數(shù)據(jù)持久化

    鏈路追蹤SkyWalking整合項(xiàng)目以及數(shù)據(jù)持久化

    1.1 通過jar包方式整合 首先我們將一個(gè)簡單的springboot服務(wù)打成jar包。 將其上傳到Linux服務(wù)器中。 準(zhǔn)備一個(gè)啟動腳本,腳本內(nèi)容如下: 等同于 參數(shù)名對應(yīng)agent/config/agent.config配置文件中的屬性。屬性對應(yīng)的源碼: org.apache.skywalking.apm.agent.core.conf.Config.java 1.2 docker方式安裝以及集

    2024年02月10日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包