国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機)

這篇具有很好參考價值的文章主要介紹了模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

前言

1. 創(chuàng)建VirtualHost

1.1 定義虛擬主機的相關(guān)屬性

1.2 VirtualHost 構(gòu)造方法?

1.3 交換機和隊列的創(chuàng)建和刪除

1.3.1 交換機操作

1.3.2 隊列操作?

1.4 綁定的創(chuàng)建和刪除

1.5?發(fā)送消息到指定的隊列/交換機

2. 實現(xiàn)路由規(guī)則Router

2.1?checkBindingKey()

2.2?checkRoutingKey()

2.3?route()

2.4 單元測試

3. 訂閱消息

3.1 添加一個訂閱者

3.2 創(chuàng)建訂閱者管理類ConsumerManager

3.3 訂閱消息小結(jié)

4. 消息確認?basicAck()

5. VirtualHost單元測試

結(jié)語


前言

? ? ? ? 寫到這里,內(nèi)存和硬盤的數(shù)據(jù)就組織完畢了,接下來我們就會引入在消息隊列初識中提出的一個概念 --- 虛擬主機.簡單回顧一下虛擬主機的概念: 它類似于MySQL的database,是一個邏輯的集合,一個BrokerServer上可以存在多個VirtualHost.在一個BrokerServer上可以組織不同的數(shù)據(jù),可以使用不同的虛擬主機做出邏輯上的區(qū)分.本章節(jié)就是進行進一步的封裝,同時實現(xiàn)一些消息隊列的API.這里需要注意的是在RabbitMq中,虛擬主機是可以隨便創(chuàng)建和刪除的,在本項目目前只是默認只有一個虛擬主機的存在,后續(xù)根據(jù)情況會進行擴展,這里也提前預留了對于多虛擬主機的管理的數(shù)據(jù)結(jié)構(gòu).保證了不同虛擬機中的交換機 隊列 綁定 消息都是相互隔離的.本項目全部代碼已上傳Gitee,鏈接放在文章末尾,歡迎大家訪問!


模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

1. 創(chuàng)建VirtualHost

??????

注意: 這一塊比較重要也比較復雜,所以將代碼進行截圖加標注的形式進行總結(jié),完整的VirtualHost.class代碼會在講解完給出.

??????

1.1 定義虛擬主機的相關(guān)屬性

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

Router: 是用來定義交換機轉(zhuǎn)發(fā)的規(guī)則,主要實現(xiàn)的是對routingKey進行驗證以及判斷,具體的細節(jié)會在后面給出.

ConsumerManager: 實現(xiàn)的是管理消費者進行消費.

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

以上兩者就是鎖對象了,后續(xù)我們要對硬盤和內(nèi)存進行數(shù)據(jù)的讀寫,為了保證操作的原子性,以及線程安全我們會給相關(guān)操作進行加鎖.?

1.2 VirtualHost 構(gòu)造方法?

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java主要就是傳入虛擬主機的名字,對該虛擬主機的數(shù)據(jù)庫以及文件信息進行初始化,主要是對數(shù)據(jù)庫進行初始化.具體DataBaseManager.init()

初始化內(nèi)容如下:

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

?初始化完成,將硬盤中的數(shù)據(jù)恢復到內(nèi)存中

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

至此前置工作就差不多了.下面對一些重要的方法進行創(chuàng)建.

1.3 交換機和隊列的創(chuàng)建和刪除

1.3.1 交換機操作

如果交換機不存在就進行創(chuàng)建,存在就直接返回(ExchangeDeclare)

  • 1. 更改交換機的名字: 交換機名字 = 虛擬主機的名字 + 交換機的名字(更加方便后續(xù)的管理)
  • 2. 判斷交換機是否存在: 存在直接返回true即可,不存在就直接創(chuàng)建新的交換機即可.設置交換機的屬性,根據(jù)是否持久化寫入到硬盤,然后在寫入到內(nèi)存.這里需要注意的是,我們一定要先寫硬盤再寫內(nèi)存,因為些硬盤是一個失敗率很高的事情,經(jīng)常會因為文件權(quán)限問題導致數(shù)據(jù)寫入不進去.如果先寫內(nèi)存,而硬盤寫入不進去,就還需要堆內(nèi)存的數(shù)據(jù)進行刪除,這就很繁瑣了.
  • 3. 以上整個操作是對交換機進行讀寫操作,為了保證線程安全,我們進行加鎖操作.
/**
     * 1. 創(chuàng)建交換機
     * 如果交換機不存在就進行創(chuàng)建,存在就直接返回
     */
    // 創(chuàng)建交換機
    // 如果交換機不存在, 就創(chuàng)建. 如果存在, 直接返回.
    // 返回值是 boolean. 創(chuàng)建成功, 返回 true. 失敗返回 false
    public boolean exchangeDeclare(String exchangeName,
                                   ExchangeType exchangeType,
                                   boolean durable,
                                   boolean autoDelete,
                                   Map<String, Object> arguments) {

        // 1. 更改交換機的名字 交換機的名字 = 虛擬主機 + 交換機
        exchangeName = virtualHostName + exchangeName;
        try{
            synchronized (exchangeLocker){
                // 2. 判定該交換機是否存在
                Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);

                if (existsExchange != null){
                    System.out.println("[VirtualHost] 交換機已經(jīng)存在!");
                    return true;
                }
                // 3. 不存在,直接進行創(chuàng)建新的交換機
                Exchange exchange = new Exchange();
                exchange.setName(exchangeName);
                exchange.setType(exchangeType);
                exchange.setDurable(durable);
                exchange.setAutoDelete(autoDelete);
                exchange.setArguments(arguments);
                // 4. 將構(gòu)造好的交換機進行寫入硬盤(含有持久化信息的交換機)  先寫硬盤后寫內(nèi)存
                if (durable){
                    diskDataCenter.insertExchange(exchange);
                }
                // 5. 將交換機寫入到內(nèi)存中
                memoryDataCenter.insertExchange(exchange);
                System.out.println("[VirtualHost] 交換機創(chuàng)建完成! exchangeName="+exchangeName);
                // 上述操作為什么不先寫內(nèi)存后寫硬盤?
                // 因為寫硬盤操作比較容易出現(xiàn)異常,如果寫入硬盤失敗,寫入內(nèi)存成功,再進行從內(nèi)存中進行刪除就比較麻煩了
            }
            return true;
        } catch (Exception e){
            System.out.println("[VirtualHost] 交換機創(chuàng)建失敗! exchangeName="+exchangeName);
            e.printStackTrace();
            return false;
        }
    }

刪除交換機

  • 1. 更改交換機的名字: 交換機名字 = 虛擬主機的名字 + 交換機的名字(更加方便后續(xù)的管理)
  • 2. 根據(jù)交換機的名字得到交換機對象,判斷交換機是否為空,不為空進行刪除操作,還是先進行刪除硬盤的數(shù)據(jù),再刪除內(nèi)存中數(shù)據(jù)
  • 3. 以上整個操作是對交換機進行讀寫操作,為了保證線程安全,我們進行加鎖操作.
/**
     * 2.刪除交換機
     * @param exchangeName 交換機名字
     * @return
     */
    public boolean exchangeDelete(String exchangeName) {
        exchangeName = virtualHostName + exchangeName;
        try {
            synchronized (exchangeLocker) {
                // 1. 先找到對應的交換機.
                Exchange toDelete = memoryDataCenter.getExchange(exchangeName);
                if (toDelete == null) {
                    throw new MqException("[VirtualHost] 交換機不存在無法刪除!");
                }
                // 2. 刪除硬盤上的數(shù)據(jù)
                if (toDelete.isDurable()) {
                    diskDataCenter.deleteExchange(exchangeName);
                }
                // 3. 刪除內(nèi)存中的交換機數(shù)據(jù)
                memoryDataCenter.deleteExchange(exchangeName);
                System.out.println("[VirtualHost] 交換機刪除成功! exchangeName=" + exchangeName);
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 交換機刪除失敗! exchangeName=" + exchangeName);
            e.printStackTrace();
            return false;
        }
    }

1.3.2 隊列操作?

針對隊列創(chuàng)建和刪除操作,這里就不做過多的解釋了,過程跟上述交換機的操作一樣. 下面給出代碼:

/**
     * 3. 創(chuàng)建隊列
     * @param queueName 隊列名
     * @param durable 持久化
     * @param exclusive 隊列獨有
     * @param autoDelete 自動刪除
     * @param arguments 其他聲明
     * @return
     */
    public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
                                Map<String, Object> arguments) {
        // 把隊列的名字, 給拼接上虛擬主機的名字.
        queueName = virtualHostName + queueName;
        try {
            synchronized (queueLocker) {
                // 1. 判定隊列是否存在
                MSQueue existsQueue = memoryDataCenter.getQueue(queueName);
                if (existsQueue != null) {
                    System.out.println("[VirtualHost] 隊列已經(jīng)存在! queueName=" + queueName);
                    return true;
                }
                // 2. 創(chuàng)建隊列對象
                MSQueue queue = new MSQueue();
                queue.setName(queueName);
                queue.setDurable(durable);
                queue.setExclusive(exclusive);
                queue.setAutoDelete(autoDelete);
                queue.setArguments(arguments);
                // 3. 寫硬盤
                if (durable) {
                    diskDataCenter.insertQueue(queue);
                }
                // 4. 寫內(nèi)存
                memoryDataCenter.insertQueue(queue);
                System.out.println("[VirtualHost] 隊列創(chuàng)建成功! queueName=" + queueName);
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 隊列創(chuàng)建失敗! queueName=" + queueName);
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 4. 刪除隊列
     * @param queueName 隊列名
     * @return
     */
    public boolean queueDelete(String queueName) {
        queueName = virtualHostName + queueName;
        try {
            synchronized (queueLocker) {
                // 1. 根據(jù)隊列名字, 查詢下當前的隊列對象
                MSQueue queue = memoryDataCenter.getQueue(queueName);
                if (queue == null) {
                    throw new MqException("[VirtualHost] 隊列不存在! 無法刪除! queueName=" + queueName);
                }
                // 2. 刪除硬盤數(shù)據(jù)
                if (queue.isDurable()) {
                    diskDataCenter.deleteQueue(queueName);
                }
                // 3. 刪除內(nèi)存數(shù)據(jù)
                memoryDataCenter.deleteQueue(queueName);
                System.out.println("[VirtualHost] 刪除隊列成功! queueName=" + queueName);
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 刪除隊列失敗! queueName=" + queueName);
            e.printStackTrace();
            return false;
        }
    }

1.4 綁定的創(chuàng)建和刪除

  • 1. 更改交換機和隊列的名字: 交換機名字 = 虛擬主機的名字 + 交換機的名字??隊列名字 = 虛擬主機的名字 + 隊列的名字
  • 2. 根據(jù)交換機和隊列的名字得到綁定信息的對象,判斷綁定是否為空,不為空拋出異常
  • 3. 綁定對象為空: 1, 判斷綁定的bindingKey是否合法. 2.合法就創(chuàng)建綁定對象,設置響應的綁定屬性.
  • 4.?獲取一下對應的交換機和隊列. 如果交換機或者隊列不存在, 這樣的綁定也是無法創(chuàng)建的.
  • 5. 寫入硬盤,再寫內(nèi)存
  • 6. 以上整個操作是對交換機和隊列進行讀寫操作,為了保證線程安全,我們進行加鎖操作.

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java?這一步我們在Router進行設置一個方法,等下面更加詳細的介紹router類.

/**
     * 5. 創(chuàng)建綁定
     * @param queueName 隊列名字
     * @param exchangeName 交換機名字
     * @param bindingKey 綁定規(guī)則
     * @return
     */
    public boolean queueBind(String queueName, String exchangeName, String bindingKey) {
        // 1. 轉(zhuǎn)換交換機和隊列的名字
        queueName = virtualHostName + queueName;
        exchangeName = virtualHostName + exchangeName;
        try {
            synchronized (exchangeLocker){
                synchronized (queueLocker){
                    // 2. 判斷交換機和隊列是否已經(jīng)綁定成功
                    Binding existBinding = memoryDataCenter.getBinding(exchangeName,queueName);
                    if (existBinding != null){
                        throw new MqException("[VirtualHost] binding 已經(jīng)存在! queueName=" + queueName+ ", exchangeName=" + exchangeName);
                    }

                    // 3. 驗證bing中的bindingKey 是否合法
                    if (!router.checkBindingKey(bindingKey)){
                        throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey);
                    }
                    // 4. 創(chuàng)建綁定對象
                    Binding binding = new Binding();
                    binding.setExchangeName(exchangeName);
                    binding.setQueueName(queueName);
                    binding.setBindingKey(bindingKey);
                    // 5. 獲取對應的交換機和隊列,判斷是否是存在的
                    MSQueue queue = memoryDataCenter.getQueue(queueName);
                    if (queue == null) {
                        throw new MqException("[VirtualHost] 隊列不存在! queueName=" + queueName);
                    }
                    Exchange exchange = memoryDataCenter.getExchange(exchangeName);
                    if (exchange == null) {
                        throw new MqException("[VirtualHost] 交換機不存在! exchangeName=" + exchangeName);
                    }
                    // 5. 先寫硬盤
                    if (queue.isDurable() && exchange.isDurable()) {
                        diskDataCenter.insertBinding(binding);
                    }
                    // 6. 寫入內(nèi)存
                    memoryDataCenter.insertBinding(binding);
                    System.out.println("[VirtualHost] 綁定創(chuàng)建成功! exchangeName=" + exchangeName
                            + ", queueName=" + queueName);
                }
                return true;
            }
        } catch (MqException e) {
            System.out.println("[VirtualHost] 綁定創(chuàng)建失敗! exchangeName=" + exchangeName
                    + ", queueName=" + queueName);
            e.printStackTrace();
            return false;
        }
    }

刪除綁定?

  • 1. 更改交換機和隊列的名字: 交換機名字 = 虛擬主機的名字 + 交換機的名字??隊列名字 = 虛擬主機的名字 + 隊列的名字
  • 2. 根據(jù)交換機和隊列的名字得到綁定信息的對象,判斷綁定是否為空,為空拋出異常
  • 3. 從硬盤進行刪除,從內(nèi)存進行刪除
  • 4. 以上整個操作是對交換機和隊列進行讀寫操作,為了保證線程安全,我們進行加鎖操作.這里需要注意的是,我們對交換機和隊列進行加鎖的時候,順序要和創(chuàng)建綁定的順序是一致的.不然會出現(xiàn)死鎖的現(xiàn)象.
/**
     * 6. 刪除綁定
     * @param queueName 隊列名
     * @param exchangeName 交換機名字
     * @return
     */
    public boolean queueUnbind(String queueName, String exchangeName) {
        queueName = virtualHostName + queueName;
        exchangeName = virtualHostName + exchangeName;
        try {
            synchronized (exchangeLocker) {
                synchronized (queueLocker) {
                    // 1. 獲取 binding 看是否已經(jīng)存在~
                    Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);
                    if (binding == null) {
                        throw new MqException("[VirtualHost] 刪除綁定失敗! 綁定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName);
                    }
                    // 2. 無論綁定是否持久化了, 都嘗試從硬盤刪一下. 就算不存在, 這個刪除也無副作用.
                    diskDataCenter.deleteBinding(binding);
                    // 3. 刪除內(nèi)存的數(shù)據(jù)
                    memoryDataCenter.deleteBinding(binding);
                    System.out.println("[VirtualHost] 刪除綁定成功!");
                }
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 刪除綁定失敗!");
            e.printStackTrace();
            return false;
        }
    }

1.5?發(fā)送消息到指定的隊列/交換機

發(fā)布消息其實就是把消息發(fā)送到指定的交換機中,然后根據(jù)綁定關(guān)系發(fā)送到指定的隊列

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

  • 1. 更改交換機和隊列的名字: 交換機名字 = 虛擬主機的名字 + 交換機的名字??隊列名字 = 虛擬主機的名字 + 隊列的名字?
  • 2. 檢查消息的routingKey是否合法,不合法拋出異常
  • 3. 根據(jù)傳入的交換機的名字進行查找交換機對象,然后判斷交換機的類型,而進行下一步的行為.
  • 4. 如果交換機類型為DIRECT,則表示為直接交換機,則把routingKey作為隊列的名字,先進行根據(jù)傳入的參數(shù),創(chuàng)建消息對象,然后按照剛才組合好的隊列名字進行查找隊列,查找隊列進行發(fā)送消息,沒查找進行拋出異常.發(fā)送消息的時候判斷消息是否是持久化的,是持久化就往硬盤中寫入,否則只寫內(nèi)存就可以.發(fā)送完消息之后,要進行重要的操作.通知消費者進行消費消息.這一塊是在管理消費者進行消費消息實現(xiàn)的.
  • 5. 如果交換機類型為Fanout 或者 Topic 我們需要在Router中進行設置相應的路由規(guī)則.
/**
     * 9. 發(fā)送消息到指定的隊列/交換機
     */
    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {
        try {
            // 1. 轉(zhuǎn)換交換機的名字
            exchangeName = virtualHostName + exchangeName;
            // 2. 檢查 routingKey 是否合法.
            if (!router.checkRoutingKey(routingKey)) {
                throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);
            }
            // 3. 查找交換機對象
            Exchange exchange = memoryDataCenter.getExchange(exchangeName);
            if (exchange == null) {
                throw new MqException("[VirtualHost] 交換機不存在! exchangeName=" + exchangeName);
            }
            // 4. 判定交換機的類型
            if (exchange.getType() == ExchangeType.DIRECT) {
                // 按照直接交換機的方式來轉(zhuǎn)發(fā)消息
                // 以 routingKey 作為隊列的名字, 直接把消息寫入指定的隊列中.
                // 此時, 可以無視綁定關(guān)系.
                String queueName = virtualHostName + routingKey;
                // 5. 構(gòu)造消息對象
                Message message = Message.createMessageWithId(routingKey, basicProperties, body);
                // 6. 查找該隊列名對應的對象
                MSQueue queue = memoryDataCenter.getQueue(queueName);
                if (queue == null) {
                    throw new MqException("[VirtualHost] 隊列不存在! queueName=" + queueName);
                }
                // 7. 隊列存在, 直接給隊列中寫入消息
                sendMessage(queue, message);
            } else {
                // 按照 fanout 和 topic 的方式來轉(zhuǎn)發(fā).
                // 5. 找到該交換機關(guān)聯(lián)的所有綁定, 并遍歷這些綁定對象
                ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);
                for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {
                    // 1) 獲取到綁定對象, 判定對應的隊列是否存在
                    Binding binding = entry.getValue();
                    MSQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
                    if (queue == null) {
                        // 此處咱們就不拋出異常了. 可能此處有多個這樣的隊列.
                        // 希望不要因為一個隊列的失敗, 影響到其他隊列的消息的傳輸.
                        System.out.println("[VirtualHost] basicPublish 發(fā)送消息時, 發(fā)現(xiàn)隊列不存在! queueName=" + binding.getQueueName());
                        continue;
                    }
                    // 2) 構(gòu)造消息對象
                    Message message = Message.createMessageWithId(routingKey, basicProperties, body);
                    // 3) 判定這個消息是否能轉(zhuǎn)發(fā)給該隊列.
                    //    如果是 fanout, 所有綁定的隊列都要轉(zhuǎn)發(fā)的.
                    //    如果是 topic, 還需要判定下, bindingKey 和 routingKey 是不是匹配.
                    if (!router.route(exchange.getType(), binding, message)) {
                        continue;
                    }
                    // 4) 真正轉(zhuǎn)發(fā)消息給隊列
                    sendMessage(queue, message);
                }
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 消息發(fā)送失敗!");
            e.printStackTrace();
            return false;
        }
    }

    private void sendMessage(MSQueue queue, Message message) throws IOException, MqException, InterruptedException {
        // 此處發(fā)送消息, 就是把消息寫入到 硬盤 和 內(nèi)存 上. 根據(jù)此條消息時是否要進行持久化進行判斷
        int deliverMode = message.getDeliverMode();
        // deliverMode 為 1 , 不持久化. deliverMode 為 2 表示持久化.
        if (deliverMode == 2) {
            diskDataCenter.sendMessage(queue, message);
        }
        // 寫入內(nèi)存
        memoryDataCenter.sendMessage(queue, message);

        // 此處還需要補充一個邏輯, 通知消費者可以消費消息了.
        consumerManager.notifyConsume(queue.getName());
    }

2. 實現(xiàn)路由規(guī)則Router

?這個類我們實現(xiàn)具體的路由轉(zhuǎn)發(fā)規(guī)則,對之前還沒實現(xiàn)的方法進行實現(xiàn).還未實現(xiàn)的方法具體如下:

1. 在創(chuàng)建綁定的時候我們對bindingKey進行驗證是否合法checkBindingKey();

2. 在往交換機進行發(fā)送消息的時候,我們對消息的routingKey進行驗證\checkRoutingKey();

3. 當消息插入到交換機之后,根據(jù)交換機的主題往隊列中分發(fā)消息的時候.對不同主題的交換機實現(xiàn)不同的路由規(guī)則route();

以上是我們在虛擬主機類中還沒有進行實現(xiàn)的方法.下面進行一一實現(xiàn):

2.1?checkBindingKey()

?以下是我們合法的BindingKey的規(guī)則

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

/**
     * 驗證bindingKey是否是合法的
     *     1. 數(shù)字, 字母, 下劃線
     *     2. 使用 . 分割成若干部分
     *     3. 允許存在 * 和 # 作為通配符. 但是通配符只能作為獨立的分段.
     * @return
     */
    public boolean checkBindingKey(String bindingKey){
        if (bindingKey.length() == 0) {
            // 空字符串, 也是合法情況. 比如在使用 direct / fanout 交換機的時候, bindingKey 是用不上的.
            return true;
        }
        // 檢查字符串中不能存在非法字符
        for (int i = 0; i < bindingKey.length(); i++) {
            char ch = bindingKey.charAt(i);
            if (ch >= 'A' && ch <= 'Z') {
                continue;
            }
            if (ch >= 'a' && ch <= 'z') {
                continue;
            }
            if (ch >= '0' && ch <= '9') {
                continue;
            }
            if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {
                continue;
            }
            return false;
        }
        // 檢查 * 或者 # 是否是獨立的部分.
        // aaa.*.bbb 合法情況;  aaa.a*.bbb 非法情況.
        String[] words = bindingKey.split("\\.");
        for (String word : words) {
            // 檢查 word 長度 > 1 并且包含了 * 或者 # , 就是非法的格式了.
            if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {
                return false;
            }
        }
        // 約定一下, 通配符之間的相鄰關(guān)系(人為約定的).
        // 為啥這么約定? 因為前三種相鄰的時候, 實現(xiàn)匹配的邏輯會非常繁瑣, 同時功能性提升不大~~
        // 1. aaa.#.#.bbb    => 非法
        // 2. aaa.#.*.bbb    => 非法
        // 3. aaa.*.#.bbb    => 非法
        // 4. aaa.*.*.bbb    => 合法
        for (int i = 0; i < words.length - 1; i++) {
            // 連續(xù)兩個 ##
            if (words[i].equals("#") && words[i + 1].equals("#")) {
                return false;
            }
            // # 連著 *
            if (words[i].equals("#") && words[i + 1].equals("*")) {
                return false;
            }
            // * 連著 #
            if (words[i].equals("*") && words[i + 1].equals("#")) {
                return false;
            }
        }
        return true;
    }

2.2?checkRoutingKey()

驗證routingKey是合法的.routingKey是與BindingKey進行匹配的,所以必須是具體的.????????

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

?

/**
     * 驗證routingKey是否是合法的
     *      1. 數(shù)字, 字母, 下劃線
     *      2. 使用 . 分割成若干部分
     * @return
     */
    public boolean checkRoutingKey(String routingKey){
        if (routingKey.length() == 0){
            // 空字符串,合法的情況  當交換機的類型為fanout的時候,是不需要的,所以可以設置為""
            return true;
        }
        for (int i = 0; i < routingKey.length(); i++) {
            char ch = routingKey.charAt(i);
            // 判定該字符是否是大寫字母
            if (ch >= 'A' && ch <= 'Z') {
                continue;
            }
            // 判定該字母是否是小寫字母
            if (ch >= 'a' && ch <= 'z') {
                continue;
            }
            // 判定該字母是否是阿拉伯數(shù)字
            if (ch >= '0' && ch <= '9') {
                continue;
            }
            // 判定是否是 _ 或者 .
            if (ch == '_' || ch == '.') {
                continue;
            }
            // 該字符, 不是上述任何一種合法情況, 就直接返回 false
            return false;
        }
        // 把每個字符都檢查過, 沒有遇到非法情況. 此時直接返回 true
        return true;
    }

2.3?route()

判斷交換機的類型進而得出是否可以進行給隊列進行轉(zhuǎn)發(fā)消息.

1. 交換機的類型為fanout.代表給交換機進行綁定的所有隊列進行轉(zhuǎn)發(fā)消息.

2. 交換機的類型為Topic,需要對routingKey進行判斷.進而設置給隊列轉(zhuǎn)發(fā)消息

/**
     *  判斷是否可以給綁定的交換機進行轉(zhuǎn)發(fā)消息
     * @return
     */
    public boolean route(ExchangeType type, Binding binding, Message message) throws MqException {
        if (type == ExchangeType.FANOUT){
            // 如果交換機類型為 fan-out 就直接進行返回true,表示轉(zhuǎn)發(fā)給當前當前綁定的所有對列
            return true;
        }else if(type == ExchangeType.TOPIC){
            // 如果是主題交換機,規(guī)則就比較復雜
            return routerTopic(binding,message);
        }else {
            throw new MqException("[Router] 交換機類型有誤 exchangeType=" + type);
        }
    }

?對于主題交換機,我們進行詳細的講解.

  • 1.?將bindingKey 和 routingKey 進行按照"."進行分割成字符串數(shù)組
  • 2. 定義下標進行遍歷數(shù)組
  • 3. 遍歷兩個數(shù)組,主要分為5種情況.
    • 3.1? 當bindingKey遇到*號時直接跳過*,兩個下標都進行自增1
    • 3.2 當bindingKey遇到#號,如果此時#號是bindingKey的最后一位,那么直接返回true
    • 3.3 當bindingKey遇到#號,如果此時#號不是最后一位,就去匹配#號下一位在routingKey的部分,匹配到了就將routingIndex指到匹配的位置,進而在進行上述循環(huán),如果沒匹配到就返回false
    • 3.4?此時沒有遇見通配符,所有的內(nèi)容部都要進行匹配上,匹配不上就返回false
    • 3.5 最后判斷此時兩個數(shù)組的下標是否都比較到了末尾.比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失敗的
/**
     * 用來實現(xiàn):topic類型的交換機的轉(zhuǎn)發(fā)規(guī)則
     * @param binding  綁定信息對象
     * @param message  消息對象
     * @return
     */
    private boolean routerTopic(Binding binding, Message message) {
        // 1. 將bindingKey 和 routingKey 進行按照"."進行分割
        String[] bindingTokens = binding.getBindingKey().split("\\.");
        String[] routingTokens = message.getRoutingKey().split("\\.");
        // 2. 定義用來遍歷數(shù)組的下標
        int bindingIndex = 0;
        int routingIndex = 0;
        // 3. 進行遍歷兩個數(shù)組
        while (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length){
            if (bindingTokens[bindingIndex].equals("*")){
                // (1.)遇到*號兩個下標直接跳過 * 可以匹配一個部分
                bindingIndex++;
                routingIndex++;
            }else if (bindingTokens[bindingIndex].equals("#")){
                bindingIndex += 1;
                // (2.)遇到#號   # 可以匹配多個部分
                if (bindingIndex == bindingTokens.length){
                    // (3.)當遇到#號,#號的下標為最后一個元素的時候,直接返回true,因為可以直接匹配后面所有的內(nèi)容
                    return true;
                }else {
                    // (4.)當遇到#號,后面后還有內(nèi)容的時候,就去匹配#號下一個部分在routingKey的部分,
                    // 匹配了就直接將bindingIndex指到bindingTokens下一個部分,同時將routingIndex指到匹配的地方
                    // 沒匹配配到就返回false
                    routingIndex = findNextMatch(routingIndex,routingTokens,bindingTokens[bindingIndex]);
                    if (routingIndex == -1){
                        return false;
                    }
                    bindingIndex++;
                    routingIndex++;
                }
            }else {
                // (5.)此時沒有遇見通配符,所有的內(nèi)容部都要進行匹配上
                if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){
                    return false;
                }
                bindingIndex++;
                routingIndex++;
            }
        }
        // (6.)最后判斷此時兩個數(shù)組的下標是否都比較到了末尾
        // 比如 aaa.bbb.ccc  和  aaa.bbb 是要匹配失敗的
        if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {
            return true;
        }
        return false;
    }


    /**
     * 給定起始下標去在一個數(shù)組中尋找指定數(shù)組元素,找到就返回該元素在數(shù)組的下標,沒找到就返回-1;
     * @param routingIndex   起始下標
     * @param routingTokens  目標數(shù)組
     * @param bindingToken   目標元素
     * @return
     */
    private int findNextMatch(int routingIndex, String[] routingTokens, String bindingToken) {
        for (int i = routingIndex; i < routingTokens.length; i++) {
            if (routingTokens[i].equals(bindingToken)){
                return i;
            }
        }
        return -1;
    }

以上就是整個Router的所有方法.我們對上述代碼進行單元測試.

2.4 單元測試

?模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

?

package com.example.demo.mqserver.core;

import com.example.demo.common.MqException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import static org.junit.jupiter.api.Assertions.*;

/**
 * Created with IntelliJ IDEA.
 * Description:測試交換機的轉(zhuǎn)發(fā)規(guī)則(交換機類型為topic)
 * User: YAO
 * Date: 2023-08-01
 * Time: 13:56
 */
@SpringBootTest
class RouterTest {

    private Router router = new Router();
    private Binding binding = null;
    private Message message = null;

    @BeforeEach
    public void setUp() {
        binding = new Binding();
        message = new Message();
    }

    @AfterEach
    public void tearDown() {
        binding = null;
        message = null;
    }

    /**
     *  [測試用例]
     *      binding key          routing key         result
     *      aaa                  aaa                 true
     *      aaa.bbb              aaa.bbb             true
     *      aaa.bbb              aaa.bbb.ccc         false
     *      aaa.bbb              aaa.ccc             false
     *      aaa.bbb.ccc          aaa.bbb.ccc         true
     *      aaa.*                aaa.bbb             true
     *      aaa.*.bbb            aaa.bbb.ccc         false
     *      *.aaa.bbb            aaa.bbb             false
     *      #                    aaa.bbb.ccc         true
     *      aaa.#                aaa.bbb             true
     *      aaa.#                aaa.bbb.ccc         true
     *      aaa.#.ccc            aaa.ccc             true
     *      aaa.#.ccc            aaa.bbb.ccc         true
     *      aaa.#.ccc            aaa.aaa.bbb.ccc     true
     *      #.ccc                ccc                 true
     *      #.ccc                aaa.bbb.ccc         true
     */
    @Test
    public void test1() throws MqException {
        binding.setBindingKey("aaa");
        message.setRoutingKey("aaa");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test2() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test3() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test4() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.ccc");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test5() throws MqException {
        binding.setBindingKey("aaa.bbb.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test6() throws MqException {
        binding.setBindingKey("aaa.*");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test7() throws MqException {
        binding.setBindingKey("aaa.*.bbb");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test8() throws MqException {
        binding.setBindingKey("*.aaa.bbb");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test9() throws MqException {
        binding.setBindingKey("#");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test10() throws MqException {
        binding.setBindingKey("aaa.#");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test11() throws MqException {
        binding.setBindingKey("aaa.#");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test12() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test13() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test14() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test15() throws MqException {
        binding.setBindingKey("#.ccc");
        message.setRoutingKey("ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test16() throws MqException {
        binding.setBindingKey("#.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
}

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

?單元測試通過.

3. 訂閱消息

????????在我們的虛擬主機中進行添加方法完成消息的訂閱.要想完成消息的訂閱,就需要在消息隊列中新建一個列表consumerEnvList用來存儲消費者的信息,當有消息進行存儲到隊列的時候,此時選出消費者進行消費消息.而消費者消費信息的這個環(huán)境需要單獨定義一個類ConsumerEnv進行描述.以上這個消費信息的過程我們定義一個類ConsumerManager進行管理這些邏輯.

3.1 添加一個訂閱者

給隊列添加消費者,當隊列接收到消息的時候,就要將消息推送給訂閱者

public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
        // 構(gòu)造一個 ConsumerEnv 對象, 把這個對應的隊列找到, 再把這個 Consumer 對象添加到該隊列中.
        queueName = virtualHostName + queueName;
        try {
            consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);
            System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] basicConsume 失敗! queueName=" + queueName);
            e.printStackTrace();
            return false;
        }
    }

此處插入的參數(shù)Consumer相當于一個回調(diào)函數(shù),就是一個函數(shù)式接口.我們在common中進行定義Consumer

@FunctionalInterface
public interface Consumer {
    // Delivery 的意思是 "投遞", 這個方法預期是在每次服務器收到消息之后, 來調(diào)用.
    // 通過這個方法把消息推送給對應的消費者.
    // (注意! 這里的方法名和參數(shù), 也都是參考 RabbitMQ 展開的)
    void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws IOException, MqException;
}

定義這個回調(diào)函數(shù)表示:收到消息之后要對消息進行處理.

3.2 創(chuàng)建訂閱者管理類ConsumerManager

1.??這個類是和虛擬主機是一一對應的,每個虛擬主機都有一個管理消費者的對象,而管理的消費者的對象對應的是與之對應的.

2. 我們采用一個堵塞隊列來記錄收到消息的的隊列名字,每次隊列收到消息,就會往這個隊列中進行添加隊列的名字,然后后續(xù)進行通知這個隊列的消費者進行消費消息.

3. 單獨使用一個線程池用來執(zhí)行消息的回調(diào).(主要是獲取到消息之后,給響應設置消息的屬性與消息本體發(fā)送給客戶端.)

4. 我們設置一個掃描線程,從堵塞隊列不斷地取出元素,進而找到隊列,在這個隊列進行消費消息,并且設置掃描線程為后臺線程,這樣就不會阻止進程的結(jié)束.

public class ConsumerManager {

    // 1. 持有虛擬主機對象的引用,用來操作數(shù)據(jù)
    private VirtualHost parent;

    // 2. 指定一個線程池,負責執(zhí)行具體的回調(diào)任務
    private ExecutorService workPool = Executors.newCachedThreadPool();

    // 3. 存放令牌的隊列,存放接收到消息的隊列名字(堵塞隊列)
    // 當這個堵塞隊列一接收到隊列的名字,掃描線程就會就會找到虛擬主機,然后找到這個隊列,進而消費消息
    private BlockingQueue<String> tokenQueue = new LinkedBlockingDeque<>();

    // 4. 掃描線程  (關(guān)注令牌隊列中添加了哪些隊列的名字,就知道哪些隊列添加了消息,取出消息,進而交給線程池,進行消費這些消息)
    private Thread scannerThread = null;
}

1. 給堵塞隊列設置接口,供虛擬主機進行調(diào)用.

/**
     * 1. 收到消息,通知消費者進行消費消息(將消息對應的隊列名字添加到堵塞隊列中)
     */
    public void notifyConsume(String queueName) throws InterruptedException {
        tokenQueue.put(queueName);
    }

2. 實現(xiàn)掃描線程

public ConsumerManager(VirtualHost p) {
        parent = p;

        scannerThread = new Thread(() -> {
            while (true) {
                try {
                    // 1. 拿到令牌
                    String queueName = tokenQueue.take();
                    // 2. 根據(jù)令牌, 找到隊列
                    MSQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
                    if (queue == null) {
                        throw new MqException("[ConsumerManager] 取令牌后發(fā)現(xiàn), 該隊列名不存在! queueName=" + queueName);
                    }
                    // 3. 從這個隊列中消費一個消息.
                    synchronized (queue) {
                        consumeMessage(queue);
                    }
                } catch (InterruptedException | MqException e) {
                    e.printStackTrace();
                }
            }
        });
        // 把線程設為后臺線程.
        // 后臺線程不會影響進程的結(jié)束
        scannerThread.setDaemon(true);
        scannerThread.start();
    }

3. 添加消費者環(huán)境ConsumerEnv到指定的隊列

我們在common中實現(xiàn)這個類

@Data
public class ConsumerEnv {
    // 1. 消費者的身份標識
    private String consumerTag;
    // 2. 消費者消費隊列的名字
    private String queueName;
    // 3. 是否自動應答
    private boolean autoAck;
    // 4. 通過這個回調(diào)函數(shù)來處理收到的消息.
    private Consumer consumer;

    public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
        this.consumerTag = consumerTag;
        this.queueName = queueName;
        this.autoAck = autoAck;
        this.consumer = consumer;
    }
}

(1) 按照指定的隊列名找到這個類.

(2)?創(chuàng)建消費者環(huán)境對象,進行添加,同時如果這個隊列的消息存在,就需要進行消費這些信息,調(diào)用consumeMessage()方法傳入隊列的名字.

/**
     * 2. 新增Consumer對象到指定的對列
     */
    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
        // 找到對應的隊列.
        MSQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
        if (queue == null) {
            throw new MqException("[ConsumerManager] 隊列不存在! queueName=" + queueName);
        }
        ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);
        synchronized (queue) {
            queue.addConsumerEnv(consumerEnv);
            // 如果當前隊列中已經(jīng)有了一些消息了, 需要立即就消費掉.
            int n = parent.getMemoryDataCenter().getMessageCount(queueName);
            for (int i = 0; i < n; i++) {
                // 這個方法調(diào)用一次就消費一條消息.
                consumeMessage(queue);
            }
        }
    }

4. 消費消息 consumeMessage()

(1) 因為一個隊列中可能會有多個消費者,我們按照輪詢的方式進行挑選消費者進行消費消息,在隊列的類中,設置方法chooseConsumer()

/**
     * 挑選訂閱者 進行消費隊列中的消息 (輪詢的方式)
     * @return
     */
    public ConsumerEnv chooseConsumer(){
        // 1. 如果當前隊列對應的消費者的數(shù)量為0,直接返回null,表示沒有篩選到消費者
        if (consumerEnvList.size() == 0){
            return null;
        }
        // 2. 使用當前訂閱到的下標進行對消費者列表取模,然后進行挑選消費者記性消費消息,實現(xiàn)消息的輪詢消費
        int index = consumerSeq.get() % consumerEnvList.size();
        consumerSeq.getAndIncrement();
        return consumerEnvList.get(index);
    }

(2) 從隊列中取出消息

(3) 把消息帶入到回調(diào)方法,交給線程池進行執(zhí)行

/**
     * 消費者進行消費信息
     * @param queue
     */
    private void consumeMessage(MSQueue queue) {
        // 1. 按照輪詢的方式, 找個消費者出來.
        ConsumerEnv luckyDog = queue.chooseConsumer();
        if (luckyDog == null) {
            // 當前隊列沒有消費者, 暫時不消費. 等后面有消費者出現(xiàn)再說.
            return;
        }
        // 2. 從隊列中取出一個消息
        Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
        System.out.println(message);
        if (message == null) {
            // 當前隊列中還沒有消息, 也不需要消費.
            return;
        }
        // 3. 把消息帶入到消費者的回調(diào)方法中, 丟給線程池執(zhí)行.
        workPool.submit(() -> {
            try {
                // 1. 把消息放到待確認的集合中. 這個操作勢必在執(zhí)行回調(diào)之前.
                parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);
                // 2. 真正執(zhí)行回調(diào)操作
                luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),
                        message.getBody());
                // 3. 如果當前是 "自動應答" , 就可以直接把消息刪除了.
                //    如果當前是 "手動應答" , 則先不處理, 交給后續(xù)消費者調(diào)用 basicAck 方法來處理.

                if (luckyDog.isAutoAck()) {
                    // 此時是自動應答,表示直接刪除
                    // 1) 刪除硬盤上的消息
                    if (message.getDeliverMode() == 2) {
                        parent.getDiskDataCenter().deleteMessage(queue, message);
                    }
                    // 2) 刪除上面的待確認集合中的消息
                    parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageID());

                    // 3) 刪除內(nèi)存中消息中心里的消息
                    parent.getMemoryDataCenter().removeMessage(message.getMessageID());
                    System.out.println("[ConsumerManager] 消息被成功消費! queueName=" + queue.getName());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

3.3 訂閱消息小結(jié)

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

?4. 消息確認?basicAck()

?此處是消費者在回調(diào)函數(shù)中對消息進行處理之后再回調(diào)函數(shù)中執(zhí)行的.

  • 1. 獲取要刪除消息以及所在隊列的對象
  • 2. 刪除硬盤和內(nèi)存的數(shù)據(jù)
  • 3. 刪除未確認消息集合的數(shù)據(jù)
/**
     * 消費者消費完消息進行手動應答
     * @return
     */
    public boolean basicAck(String queueName, String messageId){
        queueName = virtualHostName + queueName;
        try {
            // 1. 獲取要刪除消息以及所在隊列的對象
            Message message = memoryDataCenter.getMessage(messageId);
            if (message == null){
                throw new MqException("[VirtualHost] 確認的信息不存在 messageId="+messageId);
            }
            MSQueue queue = memoryDataCenter.getQueue(queueName);
            if (queue == null){
                throw new MqException("[VirtualHost] 確認的隊列不存在 queueName="+queueName);
            }

            // 2
            // 1.)刪除硬盤中的數(shù)據(jù)
            if(message.getDeliverMode() == 2){
                diskDataCenter.deleteMessage(queue,message);
            }
            // 2.) 刪除消息中心的消息
            memoryDataCenter.removeMessage(message.getMessageID());

            // 3.) 刪除委未確認消息集合的消息
            memoryDataCenter.removeMessageWaitAck(queue.getName(),message.getMessageID());
            System.out.println("[VirtualHost] basicAck成功 消息被確認成功  queueName=" + queueName
            + ",messageId:." + messageId);
            return true;
        } catch (MqException | ClassNotFoundException | IOException e) {
            e.printStackTrace();
            System.out.println("[VirtualHost] basicAck失敗 消息被確認失敗  queueName=" + queueName
                    + ",messageId:." + messageId);
            return false;
        }
    }

至此以上就是VirtualHost的全部內(nèi)容,內(nèi)容很多,很繁瑣需要,靜下心來仔細的體會.

5. VirtualHost單元測試

模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java

?

package com.example.demo.mqserver;

import ch.qos.logback.core.util.FileUtil;
import com.example.demo.DemoApplication;
import com.example.demo.common.Consumer;
import com.example.demo.mqserver.core.BasicProperties;
import com.example.demo.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

import static org.junit.jupiter.api.Assertions.*;

/**
 * Created with IntelliJ IDEA.
 * Description:虛擬主機的操作測試
 * User: YAO
 * Date: 2023-08-01
 * Time: 18:26
 */
class VirtualHostTest {@Autowired
    public VirtualHost  virtualHost = null;

    @BeforeEach
    void setUp() {
        DemoApplication.context = SpringApplication.run(DemoApplication.class);
        // 創(chuàng)建好虛擬主機對象
        virtualHost = new VirtualHost("default");
    }

    @AfterEach
    void tearDown() throws IOException {
        DemoApplication.context.close();
        //把硬盤的目錄進行刪除
        File dataDir = new File("./data");
        FileUtils.deleteDirectory(dataDir);
    }

    @Test
    void exchangeDeclare() {
        boolean ok = virtualHost.exchangeDeclare("testExchange",
                ExchangeType.DIRECT,true,false,null);
        Assertions.assertTrue(ok);
    }

    @Test
    void exchangeDelete() {
        boolean ok = virtualHost.exchangeDeclare("testExchange",
                ExchangeType.DIRECT,true,false,null);
        ok = virtualHost.exchangeDelete("testExchange");
        Assertions.assertTrue(ok);
    }

    @Test
    void queueDeclare() {
        boolean ok = virtualHost.queueDeclare("testQueue",
                true,false,false,null);
        Assertions.assertTrue(ok);
    }

    @Test
    void queueDelete() {
        boolean ok = virtualHost.queueDeclare("testQueue",
                true,false,false,null);
        ok = virtualHost.queueDelete("testQueue");
        Assertions.assertTrue(ok);
    }

    @Test
    void queueBind() {
        boolean ok = virtualHost.exchangeDeclare("testExchange",
                ExchangeType.DIRECT,true,false,null);
        ok = virtualHost.queueDeclare("testQueue",
                true,false,false,null);
        ok = virtualHost.queueBind("testQueue","testExchange",
                "testBindingKey");
        Assertions.assertTrue(ok);
    }

    @Test
    void queueUnbind() {
        boolean ok = virtualHost.exchangeDeclare("testExchange",
                ExchangeType.DIRECT,true,false,null);
        ok = virtualHost.queueDeclare("testQueue",
                true,false,false,null);
        ok = virtualHost.queueBind("testQueue","testExchange",
                "testBindingKey");
        ok = virtualHost.queueUnbind("testQueue","testExchange");
        Assertions.assertTrue(ok);
    }

    @Test
    void basicPublish() {
        boolean ok = virtualHost.exchangeDeclare("testExchange",
                ExchangeType.DIRECT,true,false,null);
        ok = virtualHost.queueDeclare("testQueue",
                true,false,false,null);
        ok = virtualHost.basicPublish("testExchange","testQueue",null,"Hello".getBytes(StandardCharsets.UTF_8));
        Assertions.assertTrue(ok);
    }

    /**
     * 1. 先訂閱, 后發(fā)布消息
     */
    @Test
    public void testBasicConsume1() throws InterruptedException {
        boolean ok = virtualHost.queueDeclare("testQueue", true,
                false, false, null);
        Assertions.assertTrue(ok);
        ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true, false, null);
        Assertions.assertTrue(ok);

        // 先訂閱隊列
        ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                // 消費者自身設定的回調(diào)方法.
                System.out.println("messageId=" + basicProperties.getMessageId());
                System.out.println("body=" + new String(body, 0, body.length));

                Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
                Assertions.assertEquals(1, basicProperties.getDeliverMode());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);

        // 再發(fā)送消息
        ok = virtualHost.basicPublish("testExchange", "testQueue", null,
                "hello".getBytes());
        Assertions.assertTrue(ok);
    }

    /**
     *  先發(fā)送消息, 后訂閱隊列.
     */
    @Test
    public void testBasicConsume2() throws InterruptedException {
        boolean ok = virtualHost.queueDeclare("testQueue", true,
                false, false, null);
        Assertions.assertTrue(ok);
        ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true, false, null);
        Assertions.assertTrue(ok);

        // 先發(fā)送消息
        ok = virtualHost.basicPublish("testExchange", "testQueue", null,
                "hello".getBytes());
        Assertions.assertTrue(ok);

        // 再訂閱隊列
        ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                // 消費者自身設定的回調(diào)方法.
                System.out.println("messageId=" + basicProperties.getMessageId());
                System.out.println("body=" + new String(body, 0, body.length));

                Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
                Assertions.assertEquals(1, basicProperties.getDeliverMode());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);
        Thread.sleep(500);
    }

    @Test
    public void testBasicConsumeFanout() throws InterruptedException {
        // 創(chuàng)建一個交換機,并且綁定兩個隊列
        boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false, false, null);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueDeclare("testQueue1", false, false, false, null);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueBind("testQueue1", "testExchange", "");
        Assertions.assertTrue(ok);

        ok = virtualHost.queueDeclare("testQueue2", false, false, false, null);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueBind("testQueue2", "testExchange", "");
        Assertions.assertTrue(ok);
        // 發(fā)布消息發(fā)到交換機
        ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());
        Assertions.assertTrue(ok);

        Thread.sleep(500);

        // 兩個消費者訂閱上述的兩個隊列.
        ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("messageId=" + basicProperties.getMessageId());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("messageId=" + basicProperties.getMessageId());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);
    }

    @Test
    public void testBasicConsumeTopic() throws InterruptedException {
        // 1. 創(chuàng)建交換機(主題交換機)
        boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false, false, null);
        Assertions.assertTrue(ok);
        // 2. 創(chuàng)建隊列
        ok = virtualHost.queueDeclare("testQueue", false, false, false, null);
        Assertions.assertTrue(ok);
        // 3. 將交換機和隊列進行綁定(設置bindingKey)
        ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");
        Assertions.assertTrue(ok);
        // 4. 發(fā)布消息(設置routingKey)
        ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());
        Assertions.assertTrue(ok);
        // 5. 訂閱消息
        ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("messageId=" + basicProperties.getMessageId());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);
    }

    @Test
    public void testBasicAck() throws InterruptedException {
        boolean ok = virtualHost.queueDeclare("testQueue", true,
                false, false, null);
        Assertions.assertTrue(ok);
        ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true, false, null);
        Assertions.assertTrue(ok);

        // 先發(fā)送消息
        ok = virtualHost.basicPublish("testExchange", "testQueue", null,
                "hello".getBytes());
        Assertions.assertTrue(ok);

        // 再訂閱隊列 [要改的地方, 把 autoAck 改成 false]
        ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                // 消費者自身設定的回調(diào)方法.
                System.out.println("messageId=" + basicProperties.getMessageId());
                System.out.println("body=" + new String(body, 0, body.length));

                Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
                Assertions.assertEquals(1, basicProperties.getDeliverMode());
                Assertions.assertArrayEquals("hello".getBytes(), body);

                // [要改的地方, 新增手動調(diào)用 basicAck]
                boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());
                Assertions.assertTrue(ok);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);
    }
}

結(jié)語

? ? ? ? 本文將整個VirtualHost進行了實現(xiàn),實現(xiàn)了供BrokerServer調(diào)用的API.基礎的消息隊列框架已經(jīng)搭建好了,接下來就是搭建服務器和客戶端了.請持續(xù)關(guān)注,謝謝!!!

完整的項目代碼已上傳Gitee,歡迎大家訪問.??????

模擬實現(xiàn)消息隊列https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機),消息隊列項目,spring boot,rabbitmq,服務器,java文章來源地址http://www.zghlxwxcb.cn/news/detail-643139.html

到了這里,關(guān)于模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務器費用

相關(guān)文章

  • Windows Server 2012 R2服務器Microsoft 消息隊列遠程代碼執(zhí)行漏洞CVE-2023-21554補丁KB5025288的安裝及問題解決

    Windows Server 2012 R2服務器Microsoft 消息隊列遠程代碼執(zhí)行漏洞CVE-2023-21554補丁KB5025288的安裝及問題解決

    近日,系統(tǒng)安全掃描中發(fā)現(xiàn)Windows Server 2012 R2服務器存在Microsoft 消息隊列遠程代碼執(zhí)行漏洞。本文記錄補丁安裝中遇到的“此更新不適用于你的計算機”問題及解決辦法。 一、問題描述: 1、系統(tǒng)安全掃描中發(fā)現(xiàn)Windows Server 2012 R2服務器存在Microsoft 消息隊列遠程代碼執(zhí)行漏洞,

    2024年02月10日
    瀏覽(24)
  • Web服務器實現(xiàn)|基于阻塞隊列線程池的Http服務器|線程控制|Http協(xié)議

    Web服務器實現(xiàn)|基于阻塞隊列線程池的Http服務器|線程控制|Http協(xié)議

    代碼地址:WebServer_GitHub_Addr 摘要 本實驗通過C++語言,實現(xiàn)了一個基于阻塞隊列線程池的多線程Web服務器。該服務器支持通過http協(xié)議發(fā)送報文,跨主機抓取服務器上特定資源。與此同時,該Web服務器后臺通過C++語言,通過原生系統(tǒng)線程調(diào)用 pthread.h ,實現(xiàn)了一個 基于阻塞隊列

    2024年02月07日
    瀏覽(21)
  • Qt實現(xiàn)客戶端與服務器消息發(fā)送

    Qt實現(xiàn)客戶端與服務器消息發(fā)送

    里用Qt來簡單設計實現(xiàn)一個場景,即: (1)兩端:服務器QtServer和客戶端QtClient (2)功能:服務端連接客戶端,兩者能夠互相發(fā)送消息,傳送文件,并且顯示文件傳送進度。 環(huán)境:VS20013 + Qt5.11.2 + Qt設計師 先看效果: 客戶端與服務器的基本概念不說了,關(guān)于TCP通信的三次握

    2024年02月11日
    瀏覽(22)
  • 極光Java 版本服務器端實現(xiàn)別名消息推送

    REST API 文檔:

    2024年02月15日
    瀏覽(20)
  • 【網(wǎng)絡】UDP網(wǎng)絡服務器簡單模擬實現(xiàn)

    【網(wǎng)絡】UDP網(wǎng)絡服務器簡單模擬實現(xiàn)

    【網(wǎng)絡】UDP網(wǎng)絡服務器簡單模擬實現(xiàn) UDP的封裝 : UDP網(wǎng)絡服務器模擬實現(xiàn):主要分為makefile文件進行編譯 UDP客戶端 :udpClient.cc(客戶端的調(diào)用),udpClient.hpp(客戶端的實現(xiàn)) UDP服務端 :udpServer.cc(服務端的調(diào)用),udpServer.hpp(服務端的實現(xiàn)) 創(chuàng)建makefile文件: makefile里可以定義變

    2024年02月08日
    瀏覽(31)
  • 【Python】OPC UA模擬服務器實現(xiàn)

    【Python】OPC UA模擬服務器實現(xiàn)

    ?在工業(yè)自動化和物聯(lián)網(wǎng)(IoT)領(lǐng)域,OPC UA(開放平臺通信統(tǒng)一架構(gòu))已經(jīng)成為一種廣泛采用的數(shù)據(jù)交換標準。它提供了一種安全、可靠且獨立于平臺的方式來訪問實時數(shù)據(jù)。在本文中,我們將探討如何使用Python和OPC UA庫來創(chuàng)建一個高效的數(shù)據(jù)服務器,該服務器能夠從CSV文件

    2024年04月29日
    瀏覽(38)
  • 微信小程序?qū)崿F(xiàn)訂閱消息功能(Node服務器篇)

    微信小程序?qū)崿F(xiàn)訂閱消息功能(Node服務器篇)

    ?? ? ? ?* 源碼已經(jīng)上傳到資源處,需要的話點擊跳轉(zhuǎn)下載 |??源碼下載 ????????在上一篇內(nèi)容當中在微信小程序中實現(xiàn)訂閱消息功能,都在客戶端(小程序)中來實現(xiàn)的,在客戶端中模擬了服務器端來進行發(fā)送訂閱消息的功能,那么本篇就將上一篇內(nèi)容中僅在客戶端中實現(xiàn)

    2024年02月03日
    瀏覽(95)
  • 基于RabbitMQ的模擬消息隊列之二---創(chuàng)建項目及核心類

    基于RabbitMQ的模擬消息隊列之二---創(chuàng)建項目及核心類

    創(chuàng)建一個SpringBoot項目,環(huán)境:JDK8,添加依賴:Spring Web、MyBatis FrameWork(最主要) 2.核心類 在mqserver包中添加一個包,名字為core,表示核心類。 Exchange ExchangeType MSGQueue (為了區(qū)分Queue) Binding Message BasicProperties

    2024年02月11日
    瀏覽(16)
  • 使用Java服務器實現(xiàn)UDP消息的發(fā)送和接收(多線程)

    使用Java服務器實現(xiàn)UDP消息的發(fā)送和接收(多線程)

    在本篇博客中,我們將介紹如何使用Java服務器來實現(xiàn)UDP消息的發(fā)送和接收,并通過多線程的方式來處理并發(fā)請求。UDP(User Datagram Protocol)是一種無連接、不可靠的傳輸協(xié)議,適合于實時性要求高的應用場景,如實時游戲、語音通信等。 步驟: 首先,我們需要導入Java提供的

    2024年02月12日
    瀏覽(37)
  • SSE與WebSocket分別實現(xiàn)服務器發(fā)送消息通知(Golang、Gin)

    SSE與WebSocket分別實現(xiàn)服務器發(fā)送消息通知(Golang、Gin)

    服務端推送,也稱為消息推送或通知推送,是一種允許應用服務器主動將信息發(fā)送到客戶端的能力,為客戶端提供了實時的信息更新和通知,增強了用戶體驗。 服務端推送的背景與需求主要基于以下幾個訴求: 實時通知:在很多情況下,用戶期望實時接收到應用的通知,如

    2024年02月03日
    瀏覽(28)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包