国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

模擬實現(xiàn)消息隊列項目(系列4) -- 服務(wù)器模塊(內(nèi)存管理)

這篇具有很好參考價值的文章主要介紹了模擬實現(xiàn)消息隊列項目(系列4) -- 服務(wù)器模塊(內(nèi)存管理)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

前言

1.?創(chuàng)建MemoryDataCenter

2. 封裝Exchange 和 Queue方法

3. 封裝Binding操作

4. 封裝Message操作

4.1 封裝消息中心集合messageMap

4.2 封裝消息與隊列的關(guān)系集合queueMessageMap的操作

5. 封裝未確認(rèn)消息集合waitMessage的操作

6. 從硬盤中恢復(fù)數(shù)據(jù)到內(nèi)存中

7. MemoryDataCenter單元測試

結(jié)語


前言

? ? ? ? 上一節(jié)我們總結(jié)了服務(wù)器模塊的硬盤管理,將交換機(jī),隊列,綁定存書到Sqlite數(shù)據(jù)庫中,將消息按照隊進(jìn)行創(chuàng)建文件存儲在本地硬盤中.并且封裝了對于數(shù)據(jù)庫和文件的各種操作.實現(xiàn)了持久化的效果,但是實際的消息存儲/轉(zhuǎn)發(fā),主要靠內(nèi)存的結(jié)構(gòu).對于消息隊列來說,內(nèi)存部分是更關(guān)鍵的,內(nèi)存速度更快,可以達(dá)到更高的并發(fā).本節(jié)就對內(nèi)存管理進(jìn)行封裝.本項目全部代碼已上傳Gitee,鏈接放在文章末尾,歡迎大家訪問!


模擬實現(xiàn)消息隊列項目(系列4) -- 服務(wù)器模塊(內(nèi)存管理),消息隊列項目,服務(wù)器,spring boot,java,mq,rabbitmq

1.?創(chuàng)建MemoryDataCenter

路徑:mqserver.datacenter.MemoryDataCenter

模擬實現(xiàn)消息隊列項目(系列4) -- 服務(wù)器模塊(內(nèi)存管理),消息隊列項目,服務(wù)器,spring boot,java,mq,rabbitmq

考慮到多線程的原因,我們將HashMap替換成ConcurrentHashMap (對每個哈希桶進(jìn)行加鎖,相對來說是線程安全的)

@Data
public class MemoryDataCenter {
    // 1. 交換機(jī)  多線程環(huán)境下使用,使用ConcurrentHashMap會相對線程安全
    //         key:ExchangeName,value:Exchange對象
    private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();

    // 2. 隊列  key:QueueName,value:MSQueue對象
    private ConcurrentHashMap<String, MSQueue> queueMap = new ConcurrentHashMap<>();

    // 3. 綁定  key:ExchangeName,value:HashMap(key:QueueName,value:MSQueue對象)
    private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();

    // 4. 消息  key:MessageID,value:Message對象
    private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();

    // 5. 消息和隊列的映射關(guān)系 HashMap: key:QueueName,value:LinkedList(Message對象)
    private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();

    // 6. 未確認(rèn)的消息  HashMap: key:QueueName,value:HashMap(key:MessageID,value:Message對象)
    private ConcurrentHashMap<String,ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
}

2. 封裝Exchange 和 Queue方法

主要就是插入和獲取數(shù)據(jù)以及刪除

   /**
     * 1. 針對內(nèi)存中的交換機(jī),隊列設(shè)置操作
     */
    public void insertExchange(Exchange exchange) {
        exchangeMap.put(exchange.getName(), exchange);
        System.out.println("[MemoryDataCenter] 新交換機(jī)添加成功! exchangeName=" + exchange.getName());
    }

    public Exchange getExchange(String exchangeName) {
        return exchangeMap.get(exchangeName);
    }

    public void deleteExchange(String exchangeName) {
        exchangeMap.remove(exchangeName);
        System.out.println("[MemoryDataCenter] 交換機(jī)刪除成功! exchangeName=" + exchangeName);
    }

    public void insertQueue(MSQueue queue) {
        queueMap.put(queue.getName(), queue);
        System.out.println("[MemoryDataCenter] 新隊列添加成功! queueName=" + queue.getName());
    }

    public MSQueue getQueue(String queueName) {
        return queueMap.get(queueName);
    }

    public void deleteQueue(String queueName) {
        queueMap.remove(queueName);
        System.out.println("[MemoryDataCenter] 隊列刪除成功! queueName=" + queueName);
    }

3. 封裝Binding操作

這里呢之所以將綁定的操作單獨列舉出來,是因為存儲綁定信息的數(shù)據(jù)結(jié)構(gòu)是相對比較復(fù)雜的,是嵌套的HashMap.

對于插入綁定信息:

1, 首先按照交換機(jī)的名字進(jìn)行查找,如果查找不到就進(jìn)行創(chuàng)建一個HashMap的數(shù)據(jù)結(jié)構(gòu)存儲到含有綁定信息的HashMap中,如果存在的話在按照隊列名字進(jìn)行查找綁定信息,如果查找到了,說明改綁定信息已經(jīng)插入過就不要進(jìn)行插入了,如果沒找到就進(jìn)行插入操作.

2. 在上述查找和插入的操作比并不是原子的,所以我們要給是上述操作,按照bindingMap進(jìn)行加鎖.以保證我們的線程操作是安全的.

下述是相關(guān)對于綁定的操作的代碼:

    /**
     * 2. 針對綁定進(jìn)行操作
     */
    /**
     * 2.1插入綁定信息
     * @param binding
     * @throws MqException
     */
    public void insertBinding(Binding binding) throws MqException {
//        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
//        if (bindingMap == null) {
//            bindingMap = new ConcurrentHashMap<>();
//            bindingsMap.put(binding.getExchangeName(), bindingMap);
//        }
        // 先使用 exchangeName 查一下, 對應(yīng)的哈希表是否存在. 不存在就創(chuàng)建一個.
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
                k -> new ConcurrentHashMap<>());

        synchronized (bindingMap) {
            // 再根據(jù) queueName 查一下目前的綁定的交換機(jī)綁定的是否是當(dāng)前傳入的隊列. 如果已經(jīng)存在(存在相同的綁定關(guān)系了,就不需要進(jìn)行傳入), 就拋出異常. 不存在才能插入.
            if (bindingMap.get(binding.getQueueName()) != null) {
                throw new MqException("[MemoryDataCenter] 綁定已經(jīng)存在! exchangeName=" + binding.getExchangeName() +
                        ", queueName=" + binding.getQueueName());
            }
            // 最后將綁定關(guān)系傳入到bingMap中
            bindingMap.put(binding.getQueueName(), binding);
        }
        System.out.println("[MemoryDataCenter] 新綁定添加成功! exchangeName=" + binding.getExchangeName()
                + ", queueName=" + binding.getQueueName());
    }

    /**
     * 2.2 獲取綁定1: 根據(jù)exchangeName, queueName 獲取唯一的綁定
     * @param exchangeName
     * @param queueName
     */
    public Binding getBinding(String exchangeName, String queueName){
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
        if (bindingMap == null){
            return null;
        }
        synchronized (bindingMap){
            // 防止當(dāng)別的操作刪除了這個隊列的綁定信息,而導(dǎo)致的線程錯誤
            return bindingMap.get(queueName);
        }
    }

    /**
     * 2.3 獲取綁定2: 根據(jù)exchangeName 查詢所有綁定
     * @param exchangeName
     * @return
     */
    public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) throws MqException {
        if (bindingsMap.get(exchangeName) == null){
            return null;
        }
        return bindingsMap.get(exchangeName);
    }

    /**
     * 2.4 刪除綁定關(guān)系(單個) 一個交換機(jī)對應(yīng)的單個隊列的綁定關(guān)系
     * @param binding
     * @throws MqException
     */
    public void deleteBinding(Binding binding) throws MqException {
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
        if (bindingMap == null) {
            // 該交換機(jī)沒有綁定任何隊列. 報錯.
            throw new MqException("[MemoryDataCenter] 綁定不存在! exchangeName=" + binding.getExchangeName()
                    + ", queueName=" + binding.getQueueName());
        }
        bindingMap.remove(binding.getQueueName());
        System.out.println("[MemoryDataCenter] 綁定刪除成功! exchangeName=" + binding.getExchangeName()
                + ", queueName=" + binding.getQueueName());
    }

    /**
     * 2.5 刪除綁定關(guān)系(多個) 1個交換機(jī)對應(yīng)的多個隊列的綁定關(guān)系.
     */
    public void deleteBinding(String exchangeName){
        bindingsMap.remove(exchangeName);
    }

4. 封裝Message操作

4.1 封裝消息中心集合messageMap

  • 1. 添加消息到消息中心
  • 2. 根據(jù)消息ID查詢消息
  • 3. 根據(jù)消息ID刪除消息
 /**
     * 3. 針對消息進(jìn)行操作
     */
    /**
     * 3.1 添加消息
     * @param message
     */
    public void addMessage(Message message) {
        messageMap.put(message.getMessageID(), message);
        System.out.println("[MemoryDataCenter] 新消息添加成功! messageId=" + message.getMessageID());
    }

    /**
     * 3.2 根據(jù) id 查詢消息
     * @param messageId
     * @return
     */
    public Message getMessage(String messageId) {
        return messageMap.get(messageId);
    }

    /**
     * 3.3 根據(jù) id 刪除消息
     * @param messageId
     */
    public void removeMessage(String messageId) {
        messageMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息被移除! messageId=" + messageId);
    }

4.2 封裝消息與隊列的關(guān)系集合queueMessageMap的操作

  • 1. 發(fā)送消息到指定隊列名字的隊列
  • 2. 從指定隊列中獲取消息集合
  • 3. 獲取指定隊列名字隊列中消息的個數(shù)
 /**
     * 4 針對消息和隊列的關(guān)系進(jìn)行操作
     */
    /**
     * 4.1 發(fā)送消息到指定隊列
     * @param queue
     * @param message
     */
    public void sendMessage(MSQueue queue, Message message) {
        // 先根據(jù)隊列的名字, 找到該隊列對應(yīng)的消息鏈表.
        // 先根據(jù)隊列的名字進(jìn)行查詢,查不到就進(jìn)行創(chuàng)建該隊列對應(yīng)的鏈表  // computeIfAbsent線程安全的
        LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(),k-> new LinkedList<>());
        // 再把數(shù)據(jù)加到 messages 里面
        synchronized (messages) {
            // 對該隊列進(jìn)行添加的時候需要進(jìn)行加鎖
            messages.add(message);
        }
        // 在這里把該消息也往消息中心中插入一下. 假設(shè)如果 message 已經(jīng)在消息中心存在, 重復(fù)插入也沒關(guān)系.
        // 主要就是相同 messageId, 對應(yīng)的 message 的內(nèi)容一定是一樣的. (服務(wù)器代碼不會對 Message 內(nèi)容做修改 basicProperties 和 body)
        addMessage(message);
        System.out.println("[MemoryDataCenter] 消息被添加到隊列中! messageId=" + message.getMessageID());
    }

    /**
     * 4.2 從指定隊列名字中進(jìn)行提取信息
     * @param queueName
     * @return
     */
    public Message pollMessage(String queueName){
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        // 隊列中沒有信息
        if (messages == null){
            System.out.println("[MemoryDataCenter] 該隊列中沒有信息! queueName=" + queueName);
            return null;
        }
        // 將隊列進(jìn)行頭刪除(提取信息)
        synchronized (messages){
            if (messages.size() == 0){
                System.out.println("[MemoryDataCenter] 該隊列中沒有信息! queueName=" + queueName);
                return null;
            }
            Message currentMessage = messages.remove(0); System.out.println
                    ("[MemoryDataCenter] 消息已經(jīng)從隊列中取出! queueName=" + queueName + ", MessageID=" + currentMessage.getMessageID() );
            return currentMessage;
        }
    }

    /**
     * 4.3 獲取指定隊列名字中消息的個數(shù)
     * @param queueName
     * @return
     */
    public int getMessageCount(String queueName){
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        // 隊列中沒有信息
        if (messages == null){
            System.out.println("[MemoryDataCenter] 該隊列中沒有信息! queueName=" + queueName);
            return 0;
        }
        // 將隊列進(jìn)行頭刪除(提取信息)
        synchronized (messages){
            if (messages.size() == 0){
                System.out.println("[MemoryDataCenter] 該隊列中沒有信息! queueName=" + queueName);
                return 0;
            }
            return messages.size();
        }
    }

5. 封裝未確認(rèn)消息集合waitMessage的操作

  • 1.?添加消息到等待確認(rèn)隊列
  • 2. 從指定未確認(rèn)隊列中刪除消息
  • 3. 根據(jù)指定的消息ID與未確認(rèn)隊列名字獲取消息內(nèi)容
/**
     * 5. 未確認(rèn)消息Map的操作
     */

    /**
     * 5.1 添加消息到指定等待確認(rèn)隊列
     * @param queueName
     * @param message
     */
    public void addMessageWaitAck(String queueName, Message message){
        ConcurrentHashMap<String,Message> waitMessage = queueMessageWaitAckMap
                .computeIfAbsent(queueName, k-> new ConcurrentHashMap<>());
            waitMessage.put(message.getMessageID(),message);
            System.out.println("[MemoryDataCenter] 消息進(jìn)入等待確認(rèn)隊列! messageID=" + message.getMessageID());

    }

    /**
     * 5.2 從指定的未確認(rèn)消息隊列中進(jìn)行刪除消息
     * @param queueName
     * @param messageId
     */
    public void removeMessageWaitAck(String queueName, String messageId){
        ConcurrentHashMap<String,Message> waitMessage = queueMessageWaitAckMap.get(queueName);
        if (waitMessage == null){
            System.out.println("[MemoryDataCenter] 該隊列為空! queueName=" + queueName);
            return;
        }

        waitMessage.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息已經(jīng)從等待確認(rèn)隊列中移除! messageId=" + messageId);

    }

    /**
     * 5.3 根據(jù)指定消息ID從隊列中進(jìn)行獲取信息
     * @param queueName
     * @param messageId
     * @return
     */
    public Message geMessageWaitAck(String queueName, String messageId){
        ConcurrentHashMap<String,Message> waitMessage = queueMessageWaitAckMap.get(queueName);
        if (waitMessage == null){
            System.out.println("[MemoryDataCenter] 該隊列為空! queueName=" + queueName);
            return null;
        }
        return waitMessage.get(messageId);
    }

6. 從硬盤中恢復(fù)數(shù)據(jù)到內(nèi)存中

使用之前封裝過的diskDataCenter進(jìn)行恢復(fù)數(shù)據(jù).

1. 清空當(dāng)前內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)

2. 恢復(fù)所有的交換機(jī),隊列,綁定,消息數(shù)據(jù),恢復(fù)消息數(shù)據(jù)的時候,要將消息中心和消息與隊列的映射進(jìn)行恢復(fù).

/**
     * 6. 從硬盤中恢復(fù)數(shù)據(jù)到內(nèi)存中 (使用之前封裝好的管理硬盤的類進(jìn)行實現(xiàn))
     */
    public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
        // 1. 清空內(nèi)存中各種數(shù)據(jù)信息
        queueMap.clear();
        exchangeMap.clear();
        bindingsMap.clear();
        messageMap.clear();
        queueMessageMap.clear();
        // 2. 恢復(fù)所有的交換機(jī)信息
        List<Exchange> exchanges = diskDataCenter.selectAllExchange();
        for (Exchange exchange :exchanges) {
             exchangeMap.put(exchange.getName(),exchange);
        }
        // 3. 恢復(fù)所有的隊列信息
        List<MSQueue> queues = diskDataCenter.selectAllMSQueue();
        for (MSQueue msQueue :queues) {
            queueMap.put(msQueue.getName(),msQueue);
        }

        // 4. 恢復(fù)所有的綁定數(shù)據(jù)
        List<Binding> bindings = diskDataCenter.selectAllBinding();
        for (Binding binding: bindings){
            ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.
                    computeIfAbsent(binding.getExchangeName(), k-> new ConcurrentHashMap<>());
            bindingMap.put(binding.getQueueName(),binding);
        }

        // 4. 恢復(fù)所有的消息數(shù)據(jù)

        // 4.1 遍歷所有的隊列
        // List<MSQueue> queues = diskDataCenter.selectAllMSQueue();
        for (MSQueue msQueue:queues) {
            LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(msQueue.getName());
            // 4.2 將獲取的消息進(jìn)行進(jìn)行加入到隊列
            queueMessageMap.put(msQueue.getName(),messages);
            // 4.3 將消息添加上到消息中心
            for (Message message : messages) {
                messageMap.put(message.getMessageID(),message);
            }
        }

7. MemoryDataCenter單元測試

package com.example.demo.mqserver.datacenter;

import com.example.demo.DemoApplication;
import com.example.demo.common.MqException;
import com.example.demo.mqserver.core.*;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import static org.junit.jupiter.api.Assertions.*;

/**
 * Created with IntelliJ IDEA.
 * Description:
 * User: YAO
 * Date: 2023-07-31
 * Time: 10:30
 */
@SpringBootTest
class MemoryDataCenterTest {
    MemoryDataCenter memoryDataCenter = null;

    @BeforeEach
    void setUp() {
        memoryDataCenter = new MemoryDataCenter();
    }

    @AfterEach
    void tearDown() {
        memoryDataCenter = null;
    }

    // 創(chuàng)建一個測試交換機(jī)
    private Exchange createTestExchange(String exchangeName) {
        Exchange exchange = new Exchange();
        exchange.setName(exchangeName);
        exchange.setType(ExchangeType.DIRECT);
        exchange.setAutoDelete(false);
        exchange.setDurable(true);
        return exchange;
    }

    // 創(chuàng)建一個測試隊列
    private MSQueue createTestQueue(String queueName) {
        MSQueue queue = new MSQueue();
        queue.setName(queueName);
        queue.setDurable(true);
        queue.setExclusive(false);
        queue.setAutoDelete(false);
        return queue;
    }

    /**
     * 1. 針對交換機(jī)進(jìn)行操作
     */
    @Test
    public void testExchange(){
        // 1. 創(chuàng)建交換機(jī)進(jìn)行插入
        Exchange expectExchange = createTestExchange("testExchange");
        memoryDataCenter.insertExchange(expectExchange);
        // 2. 查詢交換機(jī)
        Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
        // 比較內(nèi)存中的引用是否是同一個引用
        Assertions.assertEquals(expectExchange,actualExchange);
        // 3. 刪除交換機(jī)
        memoryDataCenter.deleteExchange("testExchange");
        // 4. 查詢交換機(jī),比較結(jié)果
        actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertNull(actualExchange);
    }

    /**
     * 2. 針對隊列進(jìn)行操作
     */
    @Test
    public void testQueue(){
        // 1. 創(chuàng)建交換機(jī)進(jìn)行插入
        MSQueue expectQueue = createTestQueue("testQueue");
        memoryDataCenter.insertQueue(expectQueue);
        // 2. 查詢交換機(jī)
        MSQueue actualQueue = memoryDataCenter.getQueue("testQueue");
        // 比較內(nèi)存中的引用是否是同一個引用
        Assertions.assertEquals(expectQueue,actualQueue);
        // 3. 刪除交換機(jī)
        memoryDataCenter.deleteQueue("testQueue");
        // 4. 查詢交換機(jī),比較結(jié)果
        actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertNull(actualQueue);
    }

    /**
     * 3. 針對綁定進(jìn)行測試
     */
    @Test
    public void testBinding() throws MqException {
        // 1.創(chuàng)建綁定并加入到集合中
        Binding expectedBinding = new Binding();
        expectedBinding.setExchangeName("testExchange");
        expectedBinding.setQueueName("testQueue");
        memoryDataCenter.insertBinding(expectedBinding);
        // 2. 查詢綁定(單個)
        Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
        Assertions.assertEquals(expectedBinding,actualBinding);
        // 2.1 查詢所有的綁定
        ConcurrentHashMap<String, Binding> bindingMap = memoryDataCenter.getBindings("testExchange");
        Assertions.assertEquals(1, bindingMap.size());
        Assertions.assertEquals(expectedBinding, bindingMap.get("testQueue"));

        // 3. 刪除綁定
        memoryDataCenter.deleteBinding("testExchange");
        actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
        Assertions.assertNull(actualBinding);
        bindingMap = memoryDataCenter.getBindings("testExchange");
        Assertions.assertNull(bindingMap);
    }

    private Message createTestMessage(String content) {
        Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());
        return message;
    }
    /**
     * 4. 針對消息進(jìn)行測試
     */
    @Test
    public void testMessage(){
        // 1. 創(chuàng)建消息并插入
        Message expectedMessage = createTestMessage("testMessage");
        memoryDataCenter.addMessage(expectedMessage);

        // 2. 查詢消息并比較
        Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageID());
        Assertions.assertEquals(expectedMessage, actualMessage);

        // 4. 刪除消息
        memoryDataCenter.removeMessage(expectedMessage.getMessageID());

        // 5. 查詢消息并比較
        actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageID());
        Assertions.assertNull(actualMessage);
    }

    /**
     * 5. 測試將消息發(fā)送到對列中
     */
    @Test
    public void sendMessage(){
        // 1. 創(chuàng)建一個隊列. 創(chuàng)建10條消息,進(jìn)行插入到隊列
        MSQueue expectQueue = createTestQueue("testQueue");
        List<Message> expectMessage = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message message = createTestMessage("testMessage" + i);
            memoryDataCenter.sendMessage(expectQueue,message);
            expectMessage.add(message);
        }
        // 2.從隊列進(jìn)行取出消息
        List<Message> actualMessage = new ArrayList<>();
        while (true){
            Message message = memoryDataCenter.pollMessage("testQueue");
            if (message == null){
                break;
            }
            actualMessage.add(message);
        }
        // 3. 比較消息前后是否一致
        Assertions.assertEquals(expectMessage.size(),actualMessage.size());
        for (int i = 0; i < expectMessage.size(); i++) {
            Assertions.assertEquals(expectMessage.get(i),actualMessage.get(i));
        }
    }

    /**
     * 6. 測試未被確認(rèn)的消息
     */
    @Test
    public void testMessageWaitAck(){
        // 1. 創(chuàng)建消息,插入到未被確認(rèn)的隊列中
        Message expectedMessage = createTestMessage("expectedMessage");
        memoryDataCenter.addMessageWaitAck("testQueue", expectedMessage);

        // 2. 獲取消息從未被確認(rèn)的隊列中
        Message actualMessage = memoryDataCenter.geMessageWaitAck("testQueue", expectedMessage.getMessageID());
        Assertions.assertEquals(expectedMessage, actualMessage);

        // 3. 從未被確認(rèn)的隊列中進(jìn)行刪除消息
        memoryDataCenter.removeMessageWaitAck("testQueue", expectedMessage.getMessageID());
        // 4. 比較刪除之后的隊列是否還有消息
        actualMessage = memoryDataCenter.geMessageWaitAck("testQueue", expectedMessage.getMessageID());
        Assertions.assertNull(actualMessage);
    }

    /**
     * 7. 測試從硬盤中恢復(fù)數(shù)據(jù)到內(nèi)存
     */
    @Test
    public void testRecovery() throws IOException, MqException, ClassNotFoundException {
        // 由于后續(xù)需要進(jìn)行數(shù)據(jù)庫操作, 依賴 MyBatis. 就需要先啟動 SpringApplication, 這樣才能進(jìn)行后續(xù)的數(shù)據(jù)庫操作.
        DemoApplication.context = SpringApplication.run(DemoApplication.class);

        // 1. 在硬盤上構(gòu)造好數(shù)據(jù)
        DiskDataCenter diskDataCenter = new DiskDataCenter();
        diskDataCenter.init();

        // 構(gòu)造交換機(jī)
        Exchange expectedExchange = createTestExchange("testExchange");
        diskDataCenter.insertExchange(expectedExchange);

        // 構(gòu)造隊列
        MSQueue expectedQueue = createTestQueue("testQueue");
        diskDataCenter.insertQueue(expectedQueue);

        // 構(gòu)造綁定
        Binding expectedBinding = new Binding();
        expectedBinding.setExchangeName("testExchange");
        expectedBinding.setQueueName("testQueue");
        expectedBinding.setBindingKey("testBindingKey");
        diskDataCenter.insertBinding(expectedBinding);

        // 構(gòu)造消息
        Message expectedMessage = createTestMessage("testContent");
        diskDataCenter.sendMessage(expectedQueue, expectedMessage);

        // 2. 執(zhí)行恢復(fù)操作
        memoryDataCenter.recovery(diskDataCenter);

        // 3. 對比結(jié)果
        Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertEquals(expectedExchange.getName(), actualExchange.getName());
        Assertions.assertEquals(expectedExchange.getType(), actualExchange.getType());
        Assertions.assertEquals(expectedExchange.isDurable(), actualExchange.isDurable());
        Assertions.assertEquals(expectedExchange.isAutoDelete(), actualExchange.isAutoDelete());

        MSQueue actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertEquals(expectedQueue.getName(), actualQueue.getName());
        Assertions.assertEquals(expectedQueue.isDurable(), actualQueue.isDurable());
        Assertions.assertEquals(expectedQueue.isAutoDelete(), actualQueue.isAutoDelete());
        Assertions.assertEquals(expectedQueue.isExclusive(), actualQueue.isExclusive());

        Binding actualBinding = memoryDataCenter.getBinding("testExchange", "testQueue");
        Assertions.assertEquals(expectedBinding.getExchangeName(), actualBinding.getExchangeName());
        Assertions.assertEquals(expectedBinding.getQueueName(), actualBinding.getQueueName());
        Assertions.assertEquals(expectedBinding.getBindingKey(), actualBinding.getBindingKey());

        Message actualMessage = memoryDataCenter.pollMessage("testQueue");
        Assertions.assertEquals(expectedMessage.getMessageID(), actualMessage.getMessageID());
        Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
        Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
        Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());

        // 4. 清理硬盤的數(shù)據(jù), 把整個 data 目錄里的內(nèi)容都刪掉(包含了 meta.db 和 隊列的目錄).
        DemoApplication.context.close();
        File dataDir = new File("./data");
        FileUtils.deleteDirectory(dataDir);
    }
}

模擬實現(xiàn)消息隊列項目(系列4) -- 服務(wù)器模塊(內(nèi)存管理),消息隊列項目,服務(wù)器,spring boot,java,mq,rabbitmq


結(jié)語

????????以上內(nèi)容就是針對內(nèi)存管理的封裝,主要是設(shè)計了6中數(shù)據(jù)機(jī)構(gòu)進(jìn)行存儲交換機(jī) 隊列 綁定 消息 消息和隊列的映射 未確認(rèn)信息.后續(xù)對數(shù)據(jù)進(jìn)行操作的時候會更加具有效率.這樣我們虛擬主機(jī)中兩大核心部分:硬盤管理和內(nèi)存管理都總結(jié)完成,下一節(jié)會對上述兩種操作進(jìn)一步封裝到(VirtualHost)中,然后正式的提出消息隊列服務(wù)器BrokerServer這個概念,對其進(jìn)行完善和功能封裝.請持續(xù)關(guān)注,謝謝!!!

完整的項目代碼已上傳Gitee,歡迎大家訪問.??????

模擬實現(xiàn)消息隊列https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq模擬實現(xiàn)消息隊列項目(系列4) -- 服務(wù)器模塊(內(nèi)存管理),消息隊列項目,服務(wù)器,spring boot,java,mq,rabbitmq文章來源地址http://www.zghlxwxcb.cn/news/detail-634032.html

到了這里,關(guān)于模擬實現(xiàn)消息隊列項目(系列4) -- 服務(wù)器模塊(內(nèi)存管理)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Windows Server 2012 R2服務(wù)器Microsoft 消息隊列遠(yuǎn)程代碼執(zhí)行漏洞CVE-2023-21554補(bǔ)丁KB5025288的安裝及問題解決

    Windows Server 2012 R2服務(wù)器Microsoft 消息隊列遠(yuǎn)程代碼執(zhí)行漏洞CVE-2023-21554補(bǔ)丁KB5025288的安裝及問題解決

    近日,系統(tǒng)安全掃描中發(fā)現(xiàn)Windows Server 2012 R2服務(wù)器存在Microsoft 消息隊列遠(yuǎn)程代碼執(zhí)行漏洞。本文記錄補(bǔ)丁安裝中遇到的“此更新不適用于你的計算機(jī)”問題及解決辦法。 一、問題描述: 1、系統(tǒng)安全掃描中發(fā)現(xiàn)Windows Server 2012 R2服務(wù)器存在Microsoft 消息隊列遠(yuǎn)程代碼執(zhí)行漏洞,

    2024年02月10日
    瀏覽(24)
  • Web服務(wù)器實現(xiàn)|基于阻塞隊列線程池的Http服務(wù)器|線程控制|Http協(xié)議

    Web服務(wù)器實現(xiàn)|基于阻塞隊列線程池的Http服務(wù)器|線程控制|Http協(xié)議

    代碼地址:WebServer_GitHub_Addr 摘要 本實驗通過C++語言,實現(xiàn)了一個基于阻塞隊列線程池的多線程Web服務(wù)器。該服務(wù)器支持通過http協(xié)議發(fā)送報文,跨主機(jī)抓取服務(wù)器上特定資源。與此同時,該Web服務(wù)器后臺通過C++語言,通過原生系統(tǒng)線程調(diào)用 pthread.h ,實現(xiàn)了一個 基于阻塞隊列

    2024年02月07日
    瀏覽(20)
  • Qt實現(xiàn)客戶端與服務(wù)器消息發(fā)送

    Qt實現(xiàn)客戶端與服務(wù)器消息發(fā)送

    里用Qt來簡單設(shè)計實現(xiàn)一個場景,即: (1)兩端:服務(wù)器QtServer和客戶端QtClient (2)功能:服務(wù)端連接客戶端,兩者能夠互相發(fā)送消息,傳送文件,并且顯示文件傳送進(jìn)度。 環(huán)境:VS20013 + Qt5.11.2 + Qt設(shè)計師 先看效果: 客戶端與服務(wù)器的基本概念不說了,關(guān)于TCP通信的三次握

    2024年02月11日
    瀏覽(22)
  • 【網(wǎng)絡(luò)】UDP網(wǎng)絡(luò)服務(wù)器簡單模擬實現(xiàn)

    【網(wǎng)絡(luò)】UDP網(wǎng)絡(luò)服務(wù)器簡單模擬實現(xiàn)

    【網(wǎng)絡(luò)】UDP網(wǎng)絡(luò)服務(wù)器簡單模擬實現(xiàn) UDP的封裝 : UDP網(wǎng)絡(luò)服務(wù)器模擬實現(xiàn):主要分為makefile文件進(jìn)行編譯 UDP客戶端 :udpClient.cc(客戶端的調(diào)用),udpClient.hpp(客戶端的實現(xiàn)) UDP服務(wù)端 :udpServer.cc(服務(wù)端的調(diào)用),udpServer.hpp(服務(wù)端的實現(xiàn)) 創(chuàng)建makefile文件: makefile里可以定義變

    2024年02月08日
    瀏覽(31)
  • 【Python】OPC UA模擬服務(wù)器實現(xiàn)

    【Python】OPC UA模擬服務(wù)器實現(xiàn)

    ?在工業(yè)自動化和物聯(lián)網(wǎng)(IoT)領(lǐng)域,OPC UA(開放平臺通信統(tǒng)一架構(gòu))已經(jīng)成為一種廣泛采用的數(shù)據(jù)交換標(biāo)準(zhǔn)。它提供了一種安全、可靠且獨立于平臺的方式來訪問實時數(shù)據(jù)。在本文中,我們將探討如何使用Python和OPC UA庫來創(chuàng)建一個高效的數(shù)據(jù)服務(wù)器,該服務(wù)器能夠從CSV文件

    2024年04月29日
    瀏覽(38)
  • 微信小程序?qū)崿F(xiàn)訂閱消息功能(Node服務(wù)器篇)

    微信小程序?qū)崿F(xiàn)訂閱消息功能(Node服務(wù)器篇)

    ?? ? ? ?* 源碼已經(jīng)上傳到資源處,需要的話點擊跳轉(zhuǎn)下載 |??源碼下載 ????????在上一篇內(nèi)容當(dāng)中在微信小程序中實現(xiàn)訂閱消息功能,都在客戶端(小程序)中來實現(xiàn)的,在客戶端中模擬了服務(wù)器端來進(jìn)行發(fā)送訂閱消息的功能,那么本篇就將上一篇內(nèi)容中僅在客戶端中實現(xiàn)

    2024年02月03日
    瀏覽(94)
  • 基于RabbitMQ的模擬消息隊列之二---創(chuàng)建項目及核心類

    基于RabbitMQ的模擬消息隊列之二---創(chuàng)建項目及核心類

    創(chuàng)建一個SpringBoot項目,環(huán)境:JDK8,添加依賴:Spring Web、MyBatis FrameWork(最主要) 2.核心類 在mqserver包中添加一個包,名字為core,表示核心類。 Exchange ExchangeType MSGQueue (為了區(qū)分Queue) Binding Message BasicProperties

    2024年02月11日
    瀏覽(16)
  • 使用Java服務(wù)器實現(xiàn)UDP消息的發(fā)送和接收(多線程)

    使用Java服務(wù)器實現(xiàn)UDP消息的發(fā)送和接收(多線程)

    在本篇博客中,我們將介紹如何使用Java服務(wù)器來實現(xiàn)UDP消息的發(fā)送和接收,并通過多線程的方式來處理并發(fā)請求。UDP(User Datagram Protocol)是一種無連接、不可靠的傳輸協(xié)議,適合于實時性要求高的應(yīng)用場景,如實時游戲、語音通信等。 步驟: 首先,我們需要導(dǎo)入Java提供的

    2024年02月12日
    瀏覽(37)
  • SSE與WebSocket分別實現(xiàn)服務(wù)器發(fā)送消息通知(Golang、Gin)

    SSE與WebSocket分別實現(xiàn)服務(wù)器發(fā)送消息通知(Golang、Gin)

    服務(wù)端推送,也稱為消息推送或通知推送,是一種允許應(yīng)用服務(wù)器主動將信息發(fā)送到客戶端的能力,為客戶端提供了實時的信息更新和通知,增強(qiáng)了用戶體驗。 服務(wù)端推送的背景與需求主要基于以下幾個訴求: 實時通知:在很多情況下,用戶期望實時接收到應(yīng)用的通知,如

    2024年02月03日
    瀏覽(28)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包