国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

《消息隊列MyMQ》——參考RabbitMQ實現(xiàn)

這篇具有很好參考價值的文章主要介紹了《消息隊列MyMQ》——參考RabbitMQ實現(xiàn)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

一、什么是消息隊列?

二、需求分析

1)核心概念

2)核心API

3)交換機類型

4)持久化

5)網(wǎng)絡通信

?編輯?6)消息應答

三、 模塊劃分

四、創(chuàng)建核心類

1.ExChange

2.MSGQueue

?3.Binding

4.?Message

五. 數(shù)據(jù)庫設計

?1.配置 sqlite

引? pom.xml 依賴

?配置數(shù)據(jù)源 application.yml

2.實現(xiàn)創(chuàng)建表和數(shù)據(jù)庫基本操作

?3.實現(xiàn) DataBaseManager

4.測試 DataBaseManager

六. 消息存儲設計

1.設計思路

?2.創(chuàng)建 MessageFileManager 類

?1)實現(xiàn)統(tǒng)計?件讀寫

?2)實現(xiàn)創(chuàng)建隊列?錄

3)實現(xiàn)刪除隊列?錄?

4)檢查隊列?件是否存在?

?5)實現(xiàn)消息對象序列化/反序列化

?6)實現(xiàn)寫?消息?件

?7)實現(xiàn)刪除消息

?8)實現(xiàn)消息加載

?9)實現(xiàn)垃圾回收(GC)

10)測試 MessageFileManager?

?七. 整合數(shù)據(jù)庫和?件

?八. 內(nèi)存數(shù)據(jù)結構設計

?1.封裝 Exchange 、Queue 、Binding 、Message 方法

2.針對未確認的消息的處理?

3.實現(xiàn)重啟后恢復內(nèi)存?

4.測試 MemoryDataCenter?

九. 虛擬主機設計?

1.創(chuàng)建 VirtualHost?

2.實現(xiàn)構造?法和 getter?

3.創(chuàng)建交換機?

4.刪除交換機?

5.創(chuàng)建隊列?

6.刪除隊列?

?7.創(chuàng)建綁定

8.刪除綁定?

?9.發(fā)布消息

10.路由規(guī)則?

1) 實現(xiàn) route ?法

2) 實現(xiàn) checkRoutingKeyValid

3) 實現(xiàn) checkBindingKeyValid?

4) 實現(xiàn) routeTopic

?6) 測試 Router

11.訂閱消息

1) 添加?個訂閱者

2) 創(chuàng)建訂閱者管理管理類?

?3) 添加令牌接?

?4) 實現(xiàn)添加訂閱者

5) 實現(xiàn)掃描線程?

?6) 實現(xiàn)消費消息

?7)?結

?12.消息確認

13. 測試 VirtualHost

??. ?絡通信協(xié)議設計

1.明確需求

2.設計應?層協(xié)議?

1)請求?編輯

2)響應??編輯

3.定義 Request / Response

4.定義參數(shù)?類

5.定義返回值?類?

?6.定義其他參數(shù)類

? 1) ExchangeDeclareArguments

2) ExchangeDeleteArguments?

?3) QueueDeclareArguments

4) QueueDeleteArguments?

5) QueueBindArguments?

6) QueueUnbindArguments?

7) BasicPublishArguments?

?8) BasicConsumeArguments

9) SubScribeReturns

?一. 實現(xiàn) BrokerServer

1.創(chuàng)建 BrokerServer 類

?2.啟動/停?服務器

?3.實現(xiàn)處理連接

?4.實現(xiàn) readRequest

5.實現(xiàn) writeResponse?

6.實現(xiàn)處理請求

?7.實現(xiàn) clearClosedSession

??二. 實現(xiàn)客?端

1.創(chuàng)建 ConnectionFactory

?2.Connection 和 Channel 的定義

1) Connection 的定義?

?2) Channel 的定義

?3.封裝請求響應讀寫操作

4.創(chuàng)建 channel??

5.發(fā)送請求

1) 創(chuàng)建 channel

?2) 關閉 channel

3) 創(chuàng)建交換機

?4) 刪除交換機

5) 創(chuàng)建隊列

6) 刪除隊列

?7) 創(chuàng)建綁定

8) 刪除綁定

?9) 發(fā)送消息

10) 訂閱消息

?11) 確認消息

6.處理響應

1) 創(chuàng)建掃描線程

2) 實現(xiàn)響應的分發(fā)

3) 實現(xiàn) channel.putReturns?

?7.關閉 Connection

?8.測試客?端-服務器

?三. 案例: 基于 MQ 的?產(chǎn)者消費者模型

1.生產(chǎn)者

?2.消費者

3.運行結果

十四.總結


一、什么是消息隊列?

??

消息隊列是一種用于在應用程序之間或不同組件之間進行異步通信的軟件架構模式。它允許發(fā)送方(生產(chǎn)者)將消息發(fā)送到隊列中,而接收方(消費者)可以從隊列中獲取消息并進行處理。

消息隊列的主要作用是解耦和異步化系統(tǒng)的各個組件。發(fā)送方可以將消息放入隊列中后立即繼續(xù)執(zhí)行其他任務,而無需等待接收方的響應。接收方則可以在適當?shù)臅r候從隊列中獲取消息并進行處理,而不需要與發(fā)送方實時交互。

消息隊列可以用于各種場景,例如:

  1. 異步任務處理:將耗時的任務放入消息隊列中,由后臺的工作線程或服務進行處理,避免阻塞主線程或前端用戶界面。

  2. 系統(tǒng)解耦:不同的組件可以通過消息隊列進行通信,降低組件之間的耦合度,使系統(tǒng)更加靈活和可擴展。

  3. 流量控制:當系統(tǒng)負載過高時,可以通過消息隊列來緩沖和限制請求的處理速度,以保護系統(tǒng)的穩(wěn)定性。

  4. 日志處理:將應用程序的日志信息發(fā)送到消息隊列中,然后由其他組件進行處理和分析。

常見的消息隊列系統(tǒng)包括 RabbitMQ、Apache Kafka、ActiveMQ 等。它們提供了各種功能和特性,以滿足不同場景下的需求。

二、需求分析

1)核心概念

? ?產(chǎn)者 (Producer)

? 消費者 (Consumer)

? 中間? (Broker)

? 發(fā)布 (Publish)

? 訂閱 (Subscribe)

《消息隊列MyMQ》——參考RabbitMQ實現(xiàn),rabbitmq,分布式其中, Broker 是最核?的部分. 負責消息的存儲和轉發(fā).

在 Broker 中, ?存在以下概念.

虛擬機 (VirtualHost): 類似于 MySQL 的 "database", 是?個邏輯上的集合. ?個 BrokerServer 上可 以存在多個 VirtualHost.?

交換機 (Exchange): ?產(chǎn)者把消息先發(fā)送到 Broker 的 Exchange 上. 再根據(jù)不同的規(guī)則, 把消息轉發(fā) 給不同的 Queue.

?隊列 (Queue): 真正?來存儲消息的部分. 每個消費者決定??從哪個 Queue 上讀取消息.

綁定 (Binding): Exchange 和 Queue 之間的關聯(lián)關系. Exchange 和 Queue 可以理解成 "多對多" 關 系. 使??個關聯(lián)表就可以把這兩個概念聯(lián)系起來.?

消息 (Message): 傳遞的內(nèi)容.

《消息隊列MyMQ》——參考RabbitMQ實現(xiàn),rabbitmq,分布式這些概念, 既需要在內(nèi)存中存儲, 也需要在硬盤上存儲.

內(nèi)存存儲: ?便使?.硬盤存儲: 重啟數(shù)據(jù)不丟失.

2)核心API

對于 Broker 來說, 要實現(xiàn)以下核? API. 通過這些 API 來實現(xiàn)消息隊列的基本功能.

1. 創(chuàng)建隊列 (queueDeclare)

2. 銷毀隊列 (queueDelete)

3. 創(chuàng)建交換機 (exchangeDeclare)

4. 銷毀交換機 (exchangeDelete)

5. 創(chuàng)建綁定 (queueBind)

6. 解除綁定 (queueUnbind)

7. 發(fā)布消息 (basicPublish)

8. 訂閱消息 (basicConsume)

9. 確認消息 (basicAck)

?另???, Producer 和 Consumer 則通過?絡的?式, 遠程調(diào)?這些 API, 實現(xiàn) ?產(chǎn)者消費者模型.

3)交換機類型

對于 RabbitMQ 來說, 主要?持四種交換機類型:Direct 、?Fanout 、?Topic 、?Header.我們這里實現(xiàn)前三種.

Direct: ?產(chǎn)者發(fā)送消息時, 直接指定被該交換機綁定的隊列名.

Fanout: ?產(chǎn)者發(fā)送的消息會被復制到該交換機的所有隊列中.?

Topic: 綁定隊列到交換機上時, 指定?個字符串為 bindingKey. 發(fā)送消息指定?個字符串為 routingKey. 當 routingKey 和 bindingKey 滿??定的匹配條件的時候, 則把消息投遞到指定隊列.

4)持久化

Exchange, Queue, Binding, Message 都有持久化需求.

當程序重啟 / 主機重啟, 保證上述內(nèi)容不丟失.

5)網(wǎng)絡通信

?產(chǎn)者和消費者都是客?端程序, broker 則是作為服務器. 通過?絡進?通信.

在?絡通信的過程中, 客?端部分要提供對應的 api, 來實現(xiàn)對服務器的操作.

1. 創(chuàng)建 Connection

2. 關閉 Connection

3. 創(chuàng)建 Channel

4. 關閉 Channel

5. 創(chuàng)建隊列 (queueDeclare)

6. 銷毀隊列 (queueDelete)

7. 創(chuàng)建交換機 (exchangeDeclare)

8. 銷毀交換機 (exchangeDelete)

9. 創(chuàng)建綁定 (queueBind)

10. 解除綁定 (queueUnbind)

11. 發(fā)布消息 (basicPublish)

12. 訂閱消息 (basicConsume)

13. 確認消息 (basicAck)

?Connection 對應?個 TCP 連接. Channel 則是 Connection 中的邏輯通道. ?個 Connection 中可以包含多個 Channel. Channel 和 Channel 之間的數(shù)據(jù)是獨?的. 不會相互?擾. 這樣的設定主要是為了能夠更好的復? TCP 連接, 達到?連接的效果, 避免頻繁的創(chuàng)建關閉 TCP 連接.

Connection 可以理解成?根?線. Channel 則是?線?具體的線纜.

《消息隊列MyMQ》——參考RabbitMQ實現(xiàn),rabbitmq,分布式?6)消息應答

被消費的消息, 需要進?應答. 應答模式分成兩種.

? ?動應答: 消費者只要消費了消息, 就算應答完畢了. Broker 直接刪除這個消息.

? ?動應答: 消費者?動調(diào)?應答接?, Broker 收到應答請求之后, 才真正刪除這個消息.

(?動應答的?的, 是為了保證消息確實被消費者處理成功了. 在?些對于數(shù)據(jù)可靠性要求?的場景, ? 較常?.)

三、 模塊劃分

《消息隊列MyMQ》——參考RabbitMQ實現(xiàn),rabbitmq,分布式

?可以看到, 像 交換機, 隊列, 綁定, 消息, 這?個核?概念在內(nèi)存和硬盤中都是存儲了的. 其中內(nèi)存為主, 是?來實現(xiàn)消息轉發(fā)的關鍵; 硬盤為輔, 主要是保證服務器重啟之后, 之前的信息都可以 正常保持.

四、創(chuàng)建核心類

1.ExChange

public class Exchange {
    private String name;
    private ExchangeType type = ExchangeType.DIRECT;
    private boolean durable = false;

    //省略 getter setter
} 
public enum ExchangeType {
    DIRECT(0),
    FANOUT(1),
    TOPIC(2);
    private final int type;
    private ExchangeType(int type) {
        this.type = type;
    }
    public int getType() {
        return this.type;
    }
}

name : 交換機的名字. 相當于交換機的?份標識.?

type : 交換機的類型. 三種取值, DIRECT, FANOUT, TOPIC.

durable : 交換機是否要持久化存儲. true 為持久化, false 不持久化.

?RabbitMQ 中的交換機, 還?持 autoDelete(使?完畢后是否?動刪除) 和 arguments(交換機的其他參數(shù)屬性) , 我們這里不實現(xiàn)這兩個功能。

2.MSGQueue

public class MSGQueue {
    private String name;
    private boolean durable;

    // 省略 getter setter
}

name : 隊列的名字. 相當于隊列的?份標識.

durable : 交換機是否要持久化存儲. true 為持久化, false 不持久化.

exclusive : 獨占(排他), 隊列只能被?個消費者使?.

autoDelete : 使?完畢后是否?動刪除.

arguments : 交換機的其他參數(shù)屬性.

RabbitMQ還實現(xiàn)了以上功能,我們這里不實現(xiàn)

?3.Binding

public class Binding {
    private String exchangeName;
    private String queueName;
    private String bindingKey;
    // 省略 getter setter
}

exchangeName 交換機名字

queueName 隊列名字

bindingKey 只在交換機類型為 TOPIC 時才有效. ?于和消息中的 routingKey 進?匹配

4.?Message

public class Message implements Serializable {
    private BasicProperties basicProperties = new BasicProperties();
    private byte[] body;

    // 消息在?件中對應的 offset 的范圍, [offsetBeg, offsetEnd)
    // 從這個范圍取出的 byte[] 正好可以反序列化成?個 Message 對象.
    // offsetBeg 前?的 4 個字節(jié)是消息的?度
    private transient long offsetBeg = 0;
    private transient long offsetEnd = 0;
    
    private byte isValid = 0x1; // 消息在?件中是否有效. 0x0 表??效, 0x1 表?有效
    // 創(chuàng)建新的消息, 同時給該消息分配?個新的 messageId
    // routingKey 以參數(shù)的為準. 會覆蓋掉 basicProperties 中的 routingKey
    public static Message createMessageWithId(String routingKey, BasicProperties)
         Message message = new Message();
        if (basicProperties != null) {
            message.basicProperties = basicProperties;
        }
        message.basicProperties.setMessageId("M-" + UUID.randomUUID().toString()
        message.basicProperties.setRoutingKey(routingKey);
        message.body = body;
        return message;
    }
// 省略 getter setter
}
public class BasicProperties implements Serializable {
  
    private String messageId;   // 消息的唯? id. 使? uuid 表?.
    private String routingKey;
    private int deliveryMode = 1; // 1 表?消息?持久化. 2 表?消息持久化
    // 省略 getter setter
}

Message 需要實現(xiàn) Serializable 接?. 后續(xù)需要把 Message 寫??件以及進??絡傳輸. basicProperties 是消息的屬性信息. body 是消息體.

offsetBeg 和 offsetEnd 表?消息在消息?件中所在的起始位置和結束位置.使? transient 關鍵字避免屬性被序列化.

isValid ?來表?消息在?件中是否有效. 這?塊具體的設計后?再詳細介紹. createMessageWithId 相當于?個???法, ?來創(chuàng)建?個 Message 實例.

messageId 通過 UUID 的?式?成.

五. 數(shù)據(jù)庫設計

對于 Exchange, MSGQueue, Binding, 我們使?數(shù)據(jù)庫進?持久化保存. 此處我們使?的數(shù)據(jù)庫是 SQLite, 是?個更輕量的數(shù)據(jù)庫. SQLite 只是?個動態(tài)庫(當然, 官?也提供了可執(zhí)?程序 exe), 我們在 Java 中直接引? SQLite 依賴, 即 可直接使?, 不必安裝其他的軟件.

?1.配置 sqlite

引? pom.xml 依賴

<dependency>
    <groupId>org.xerial</groupId>
    <artifactId>sqlite-jdbc</artifactId>
    <version>3.41.0.1</version>
</dependency>

?配置數(shù)據(jù)源 application.yml

spring:
    datasource:
        url: jdbc:sqlite:./data/meta.db
        username:
        password:
        driver-class-name: org.sqlite.JDBC

mybatis:
mapper-locations: classpath:mapper/**Mapper.xml

此處我們約定, 把數(shù)據(jù)庫?件放到 ./data/meta.db 中. SQLite 只是把數(shù)據(jù)單純的存儲到?個?件中. ?常簡單?便.

2.實現(xiàn)創(chuàng)建表和數(shù)據(jù)庫基本操作

mapper.MetaMapper

public interface MetaMapper {
    // 提供三個核心建表方法
    void createExchangeTable();
    void createQueueTable();
    void createBindingTable();

    // 插入 刪除 和 查找
    void insertExchange(Exchange exchange);
    void deleteExchange(String exchangeName);
    List<Exchange> selectAllExchanges();
    void insertQueue(MSGQueue queue);
    void deleteQueue(String queueName);
    List<MSGQueue> selectAllQueues();
    void insertBinding(Binding binding);
    void deleteBinding(Binding binding);
    List<Binding> selectAllBindings();
}

?MetaMapper

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mq.mqserver.mapper.MetaMapper">
    <update id="createExchangeTable">
        create table if not exists exchange (
            name varchar(50) primary key,
            type int,
            durable boolean
            );
    </update>

    <update id="createQueueTable">
        create table if not exists queue (
            name varchar(50) primary key,
            durable boolean
            );
    </update>

    <update id="createBindingTable">
        create table if not exists binding (
            exchangeName varchar(50),
            queueName varchar(50),
            bindingKey varchar(256)
            );
    </update>

    <insert id="insertExchange" parameterType="com.example.mq.mqserver.core.Exchange">
        insert into exchange values(#{name}, #{type}, #{durable});
    </insert>

    <select id="selectAllExchanges" resultType="com.example.mq.mqserver.core.Exchange">
        select * from exchange;
    </select>

    <delete id="deleteExchange" parameterType="java.lang.String">
        delete from exchange where name = #{exchangeName};
    </delete>

    <insert id="insertQueue" parameterType="com.example.mq.mqserver.core.MSGQueue">
        insert into queue values(#{name}, #{durable});
    </insert>

    <select id="selectAllQueues" resultType="com.example.mq.mqserver.core.MSGQueue">
        select * from queue;
    </select>

    <delete id="deleteQueue" parameterType="java.lang.String">
        delete from queue where name = #{queueName};
    </delete>

    <insert id="insertBinding" parameterType="com.example.mq.mqserver.core.Binding">
        insert into binding values(#{exchangeName}, #{queueName}, #{bindingKey});
    </insert>

    <select id="selectAllBindings" resultType="com.example.mq.mqserver.core.Binding">
        select * from binding;
    </select>

    <delete id="deleteBinding" parameterType="com.example.mq.mqserver.core.Binding">
        delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName};
    </delete>
</mapper>

?3.實現(xiàn) DataBaseManager

mqserver.datacenter.DataBaseManager


/*
 * 通過這個類, 來整合數(shù)據(jù)庫操作.
 */
public class DataBaseManager {
    // 要做的是從 Spring 中拿到現(xiàn)成的對象
    private MetaMapper metaMapper;

    // 針對數(shù)據(jù)庫進行初始化
    public void init() {
        // 手動的獲取到 MetaMapper
        metaMapper = MqApplication.context.getBean(MetaMapper.class);

        if (!checkDBExists()) {
            // 數(shù)據(jù)庫不存在, 就進行建建庫表操作
            // 先創(chuàng)建一個 data 目錄
            File dataDir = new File("./data");
            dataDir.mkdirs();
            // 創(chuàng)建數(shù)據(jù)表
            createTable();
            // 插入默認數(shù)據(jù)
            createDefaultData();
            System.out.println("[DataBaseManager] 數(shù)據(jù)庫初始化完成!");
        } else {
            // 數(shù)據(jù)庫已經(jīng)存在了, 啥都不必做即可
            System.out.println("[DataBaseManager] 數(shù)據(jù)庫已經(jīng)存在!");
        }
    }

    public void deleteDB() {
        File file = new File("./data/meta.db");
        boolean ret = file.delete();
        if (ret) {
            System.out.println("[DataBaseManager] 刪除數(shù)據(jù)庫文件成功!");
        } else {
            System.out.println("[DataBaseManager] 刪除數(shù)據(jù)庫文件失敗!");
        }

        File dataDir = new File("./data");
        // 使用 delete 刪除目錄的時候, 需要保證目錄是空的.
        ret = dataDir.delete();
        if (ret) {
            System.out.println("[DataBaseManager] 刪除數(shù)據(jù)庫目錄成功!");
        } else {
            System.out.println("[DataBaseManager] 刪除數(shù)據(jù)庫目錄失敗!");
        }
    }

    private boolean checkDBExists() {
        File file = new File("./data/meta.db");
        if (file.exists()) {
            return true;
        }
        return false;
    }

    // 這個方法用來建表.
    // 建庫操作并不需要手動執(zhí)行. (不需要手動創(chuàng)建 meta.db 文件)
    // 首次執(zhí)行這里的數(shù)據(jù)庫操作的時候, 就會自動的創(chuàng)建出 meta.db 文件來 (MyBatis 幫我們完成的)
    private void createTable() {
        metaMapper.createExchangeTable();
        metaMapper.createQueueTable();
        metaMapper.createBindingTable();
        System.out.println("[DataBaseManager] 創(chuàng)建表完成!");
    }

    // 給數(shù)據(jù)庫表中, 添加默認的數(shù)據(jù).
    // 此處主要是添加一個默認的交換機.
    // RabbitMQ 里有一個這樣的設定: 帶有一個 匿名 的交換機, 類型是 DIRECT.
    private void createDefaultData() {
        // 構造一個默認的交換機.
        Exchange exchange = new Exchange();
        exchange.setName("");
        exchange.setType(ExchangeType.DIRECT);
        exchange.setDurable(true);
        metaMapper.insertExchange(exchange);
        System.out.println("[DataBaseManager] 創(chuàng)建初始數(shù)據(jù)完成!");
    }

    // 把其他的數(shù)據(jù)庫的操作, 也在這個類中封裝一下.
    public void insertExchange(Exchange exchange) {
        metaMapper.insertExchange(exchange);
    }

    public List<Exchange> selectAllExchanges() {
        return metaMapper.selectAllExchanges();
    }

    public void deleteExchange(String exchangeName) {
        metaMapper.deleteExchange(exchangeName);
    }

    public void insertQueue(MSGQueue queue) {
        metaMapper.insertQueue(queue);
    }

    public List<MSGQueue> selectAllQueues() {
        return metaMapper.selectAllQueues();
    }

    public void deleteQueue(String queueName) {
        metaMapper.deleteQueue(queueName);
    }

    public void insertBinding(Binding binding) {
        metaMapper.insertBinding(binding);
    }

    public List<Binding> selectAllBindings() {
        return metaMapper.selectAllBindings();
    }

    public void deleteBinding(Binding binding) {
        metaMapper.deleteBinding(binding);
    }
}

4.測試 DataBaseManager

// 加上這個注解之后, 改類就會被識別為單元測試類.
@SpringBootTest
public class DataBaseManagerTests {
    private DataBaseManager dataBaseManager = new DataBaseManager();

    // 接下來下面這里需要編寫多個 方法 . 每個方法都是一個/一組單元測試用例.
    // 還需要做一個準備工作. 需要寫兩個方法, 分別用于進行 "準備工作" 和 "收尾工作"

    // 使用這個方法, 來執(zhí)行準備工作. 每個用例執(zhí)行前, 都要調(diào)用這個方法.
    @BeforeEach
    public void setUp() {
        // 由于在 init 中, 需要通過 context 對象拿到 metaMapper 實例的.
        // 所以就需要先把 context 對象給搞出來.
        MqApplication.context = SpringApplication.run(MqApplication.class);
        dataBaseManager.init();
    }

    // 使用這個方法, 來執(zhí)行收尾工作. 每個用例執(zhí)行后, 都要調(diào)用這個方法.
    @AfterEach
    public void tearDown() {
        // 這里要進行的操作, 就是把數(shù)據(jù)庫給清空~~ (把數(shù)據(jù)庫文件, meta.db 直接刪了就行了)
        // 注意, 此處不能直接就刪除, 而需要先關閉上述 context 對象!!
        // 此處的 context 對象, 持有了 MetaMapper 的實例, MetaMapper 實例又打開了 meta.db 數(shù)據(jù)庫文件.
        // 如果 meta.db 被別人打開了, 此時的刪除文件操作是不會成功的 (Windows 系統(tǒng)的限制, Linux 則沒這個問題).
        // 另一方面, 獲取 context 操作, 會占用 8080 端口. 此處的 close 也是釋放 8080.
        MqApplication.context.close();
        dataBaseManager.deleteDB();
    }

    @Test
    public void testInitTable() {
        // 由于 init 方法, 已經(jīng)在上面 setUp 中調(diào)用過了. 直接在測試用例代碼中, 檢查當前的數(shù)據(jù)庫狀態(tài)即可.
        // 直接從數(shù)據(jù)庫中查詢. 看數(shù)據(jù)是否符合預期.
        // 查交換機表, 里面應該有一個數(shù)據(jù)(匿名的 exchange); 查隊列表, 沒有數(shù)據(jù); 查綁定表, 沒有數(shù)據(jù).
        List<Exchange> exchangeList = dataBaseManager.selectAllExchanges();
        List<MSGQueue> queueList = dataBaseManager.selectAllQueues();
        List<Binding> bindingList = dataBaseManager.selectAllBindings();

        // 直接打印結果, 通過肉眼來檢查結果, 固然也可以. 但是不優(yōu)雅, 不方便.
        // 更好的辦法是使用斷言.
        // System.out.println(exchangeList.size());
        // assertEquals 判定結果是不是相等.
        // 注意這倆參數(shù)的順序. 雖然比較相等, 誰在前誰在后, 無所謂.
        // 但是 assertEquals 的形參, 第一個形參叫做 expected (預期的), 第二個形參叫做 actual (實際的)
        Assertions.assertEquals(1, exchangeList.size());
        Assertions.assertEquals("", exchangeList.get(0).getName());
        Assertions.assertEquals(ExchangeType.DIRECT, exchangeList.get(0).getType());
        Assertions.assertEquals(0, queueList.size());
        Assertions.assertEquals(0, bindingList.size());
    }

    private Exchange createTestExchange(String exchangeName) {
        Exchange exchange = new Exchange();
        exchange.setName(exchangeName);
        exchange.setType(ExchangeType.FANOUT);
        exchange.setDurable(true);
        return exchange;
    }

    @Test
    public void testInsertExchange() {
        // 構造一個 Exchange 對象, 插入到數(shù)據(jù)庫中. 再查詢出來, 看結果是否符合預期.
        Exchange exchange = createTestExchange("testExchange");
        dataBaseManager.insertExchange(exchange);
        // 插入完畢之后, 查詢結果
        List<Exchange> exchangeList = dataBaseManager.selectAllExchanges();
        Assertions.assertEquals(2, exchangeList.size());
        Exchange newExchange = exchangeList.get(1);
        Assertions.assertEquals("testExchange", newExchange.getName());
        Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());
        Assertions.assertEquals(true, newExchange.isDurable());
    }

    @Test
    public void testDeleteExchange() {
        // 先構造一個交換機, 插入數(shù)據(jù)庫; 然后再按照名字刪除即可!
        Exchange exchange = createTestExchange("testExchange");
        dataBaseManager.insertExchange(exchange);
        List<Exchange> exchangeList = dataBaseManager.selectAllExchanges();
        Assertions.assertEquals(2, exchangeList.size());
        Assertions.assertEquals("testExchange", exchangeList.get(1).getName());

        // 進行刪除操作
        dataBaseManager.deleteExchange("testExchange");
        // 再次查詢
        exchangeList = dataBaseManager.selectAllExchanges();
        Assertions.assertEquals(1, exchangeList.size());
        Assertions.assertEquals("", exchangeList.get(0).getName());
    }

    private MSGQueue createTestQueue(String queueName) {
        MSGQueue queue = new MSGQueue();
        queue.setName(queueName);
        queue.setDurable(true);
        return queue;
    }

    @Test
    public void testInsertQueue() {
        MSGQueue queue = createTestQueue("testQueue");
        dataBaseManager.insertQueue(queue);

        List<MSGQueue> queueList = dataBaseManager.selectAllQueues();

        Assertions.assertEquals(1, queueList.size());
        MSGQueue newQueue = queueList.get(0);
        Assertions.assertEquals("testQueue", newQueue.getName());
        Assertions.assertEquals(true, newQueue.isDurable());
    }

    @Test
    public void testDeleteQueue() {
        MSGQueue queue = createTestQueue("testQueue");
        dataBaseManager.insertQueue(queue);
        List<MSGQueue> queueList = dataBaseManager.selectAllQueues();
        Assertions.assertEquals(1, queueList.size());
        // 進行刪除
        dataBaseManager.deleteQueue("testQueue");
        queueList = dataBaseManager.selectAllQueues();
        Assertions.assertEquals(0, queueList.size());
    }

    private Binding createTestBinding(String exchangeName, String queueName) {
        Binding binding = new Binding();
        binding.setExchangeName(exchangeName);
        binding.setQueueName(queueName);
        binding.setBindingKey("testBindingKey");
        return binding;
    }

    @Test
    public void testInsertBinding() {
        Binding binding = createTestBinding("testExchange", "testQueue");
        dataBaseManager.insertBinding(binding);

        List<Binding> bindingList = dataBaseManager.selectAllBindings();
        Assertions.assertEquals(1, bindingList.size());
        Assertions.assertEquals("testExchange", bindingList.get(0).getExchangeName());
        Assertions.assertEquals("testQueue", bindingList.get(0).getQueueName());
        Assertions.assertEquals("testBindingKey", bindingList.get(0).getBindingKey());
    }

    @Test
    public void testDeleteBinding() {
        Binding binding = createTestBinding("testExchange", "testQueue");
        dataBaseManager.insertBinding(binding);
        List<Binding> bindingList = dataBaseManager.selectAllBindings();
        Assertions.assertEquals(1, bindingList.size());

        // 刪除
        Binding toDeleteBinding = createTestBinding("testExchange", "testQueue");
        dataBaseManager.deleteBinding(toDeleteBinding);
        bindingList = dataBaseManager.selectAllBindings();
        Assertions.assertEquals(0, bindingList.size());
    }
}

六. 消息存儲設計

1.設計思路

消息需要在硬盤上存儲. 但是并不直接放到數(shù)據(jù)庫中, ?是直接使??件存儲.因為:

1. 對于消息的操作并不需要復雜的 增刪改查 .

2. 對于?件的操作效率?數(shù)據(jù)庫會?很多.

我們給每個隊列分配?個?錄. ?錄的名字為 data + 隊列名. 形如 ./data/testQueue 該?錄中包含兩個固定名字的?件.

queue_data.txt 消息數(shù)據(jù)?件, ?來保存消息內(nèi)容.

queue_stat.txt 消息統(tǒng)計?件, ?來保存消息統(tǒng)計信息.

?queue_data.txt ?件格式: 使??進制?式存儲. 每個消息分成兩個部分:

前四個字節(jié), 表? Message 對象的?度(字節(jié)數(shù))

后?若?字節(jié), 表? Message 內(nèi)容.

消息和消息之間?尾相連.

?《消息隊列MyMQ》——參考RabbitMQ實現(xiàn),rabbitmq,分布式

queue_stat.txt ?件格式:?使??本?式存儲.

?件中只包含??, ??包含兩列(都是整數(shù)), 使? \t 分割.

第?列表?當前總的消息數(shù)?. 第?列表?有效消息數(shù)?.

形如:??2000\t1500

?2.創(chuàng)建 MessageFileManager 類

mqserver.database.MessageFileManager

public class MessageFileManager {
    // 表?消息的統(tǒng)計信息
    static public class Stat {
        public int totalCount;
        public int validCount;
    }

    public void init() {
        // 當前這?不需要做任何?作.
    }

    // 隊列?錄
    private String getQueueDir(String queueName) {
        return "./data/" + queueName;
    }

    // 隊列數(shù)據(jù)?件
    // 這個?件來存儲隊列的真實數(shù)據(jù)
    private String getQueueDataPath(String queueName) {
        return getQueueDir(queueName) + "/queue_data.txt";
    }

    // 隊列統(tǒng)計?件
    // 這個?件?來存儲隊列中的統(tǒng)計信息.
    // 包含??, 兩個列使? \t 分割, 分別是總數(shù)據(jù), 和?效數(shù)據(jù).
    private String getQueueStatPath(String queueName) {
        return getQueueDir(queueName) + "/queue_stat.txt";
    }
}

?1)實現(xiàn)統(tǒng)計?件讀寫

    private Stat readStat(String queueName) {
        // 由于當前的消息統(tǒng)計文件是文本文件, 可以直接使用 Scanner 來讀取文件內(nèi)容
        Stat stat = new Stat();
        try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {
            Scanner scanner = new Scanner(inputStream);
            stat.totalCount = scanner.nextInt();
            stat.validCount = scanner.nextInt();
            return stat;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    private void writeStat(String queueName, Stat stat) {
        // 使用 PrintWrite 來寫文件.
        // OutputStream 打開文件, 默認情況下, 會直接把原文件清空. 此時相當于新的數(shù)據(jù)覆蓋了舊的.
        try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName)))             {
            PrintWriter printWriter = new PrintWriter(outputStream);
            printWriter.write(stat.totalCount + "\t" + stat.validCount);
            printWriter.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

?2)實現(xiàn)創(chuàng)建隊列?錄

每個隊列都有??的?錄和配套的?件. 通過下列?法把?錄和?件先準備好.

 // 創(chuàng)建隊列對應的文件和目錄
    public void createQueueFiles(String queueName) throws IOException {
        // 1. 先創(chuàng)建隊列對應的消息目錄
        File baseDir = new File(getQueueDir(queueName));
        if (!baseDir.exists()) {
            // 不存在, 就創(chuàng)建這個目錄
            boolean ok = baseDir.mkdirs();
            if (!ok) {
                throw new IOException("創(chuàng)建目錄失敗! baseDir=" + baseDir.getAbsolutePath());
            }
        }
        // 2. 創(chuàng)建隊列數(shù)據(jù)文件
        File queueDataFile = new File(getQueueDataPath(queueName));
        if (!queueDataFile.exists()) {
            boolean ok = queueDataFile.createNewFile();
            if (!ok) {
                throw new IOException("創(chuàng)建文件失敗! queueDataFile=" + queueDataFile.getAbsolutePath());
            }
        }
        // 3. 創(chuàng)建消息統(tǒng)計文件
        File queueStatFile = new File(getQueueStatPath(queueName));
        if (!queueStatFile.exists()) {
            boolean ok = queueStatFile.createNewFile();
            if (!ok) {
                throw new IOException("創(chuàng)建文件失敗! queueStatFile=" + queueStatFile.getAbsolutePath());
            }
        }
        // 4. 給消息統(tǒng)計文件, 設定初始值. 0\t0
        Stat stat = new Stat();
        stat.totalCount = 0;
        stat.validCount = 0;
        writeStat(queueName, stat);
    }

3)實現(xiàn)刪除隊列?錄?

    public void destroyQueueFiles(String queueName) throws IOException {
        // 先刪除里面的文件, 再刪除目錄.
        File queueDataFile = new File(getQueueDataPath(queueName));
        boolean ok1 = queueDataFile.delete();
        File queueStatFile = new File(getQueueStatPath(queueName));
        boolean ok2 = queueStatFile.delete();
        File baseDir = new File(getQueueDir(queueName));
        boolean ok3 = baseDir.delete();
        if (!ok1 || !ok2 || !ok3) {
            // 有任意一個刪除失敗, 都算整體刪除失敗.
            throw new IOException("刪除隊列目錄和文件失敗! baseDir=" + baseDir.getAbsolutePath());
        }
    }

注意: File 類的 delete ?法只能刪除空?錄. 因此需要先把內(nèi)部的?件先刪除掉.

4)檢查隊列?件是否存在?

判定該隊列的消息?件和統(tǒng)計?件是否存在. ?旦出現(xiàn)缺失, 則不能進?后續(xù)?作.

    // 檢查隊列的目錄和文件是否存在.
    // 比如后續(xù)有生產(chǎn)者給 broker server 生產(chǎn)消息了, 這個消息就可能需要記錄到文件上(取決于消息是否要持久化)
    public boolean checkFilesExits(String queueName) {
        // 判定隊列的數(shù)據(jù)文件和統(tǒng)計文件是否都存在!!
        File queueDataFile = new File(getQueueDataPath(queueName));
        if (!queueDataFile.exists()) {
            return false;
        }
        File queueStatFile = new File(getQueueStatPath(queueName));
        if (!queueStatFile.exists()) {
            return false;
        }
        return true;
    }

?5)實現(xiàn)消息對象序列化/反序列化

Message 對象需要轉成?進制寫??件. 并且也需要把?件中的?進制讀出來解析成 Message 對象. 此 處針對這?的邏輯進?封裝.

創(chuàng)建 common.BinaryTool

public class BinaryTool {
    // 把一個對象序列化成一個字節(jié)數(shù)組
    public static byte[] toBytes(Object object) throws IOException {
        // 這個流對象相當于一個變長的字節(jié)數(shù)組.
        // 就可以把 object 序列化的數(shù)據(jù)給逐漸的寫入到 byteArrayOutputStream 中, 再統(tǒng)一轉成 byte[]
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
            try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
                // 此處的 writeObject 就會把該對象進行序列化, 生成的二進制字節(jié)數(shù)據(jù), 就會寫入到
                // ObjectOutputStream 中.
                // 由于 ObjectOutputStream 又是關聯(lián)到了 ByteArrayOutputStream, 最終結果就寫入到 ByteArrayOutputStream 里了
                objectOutputStream.writeObject(object);
            }
            // 這個操作就是把 byteArrayOutputStream 中持有的二進制數(shù)據(jù)取出來, 轉成 byte[]
            return byteArrayOutputStream.toByteArray();
        }
    }

    // 把一個字節(jié)數(shù)組, 反序列化成一個對象
    public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
        Object object = null;
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
            try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
                // 此處的 readObject, 就是從 data 這個 byte[] 中讀取數(shù)據(jù)并進行反序列化.
                object = objectInputStream.readObject();
            }
        }
        return object;
    }
}

?6)實現(xiàn)寫?消息?件

    // 這個方法用來把一個新的消息, 放到隊列對應的文件中.
    // queue 表示要把消息寫入的隊列. message 則是要寫的消息.
    public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
        // 1. 檢查一下當前要寫入的隊列對應的文件是否存在.
        if (!checkFilesExits(queue.getName())) {
            throw new MqException("[MessageFileManager] 隊列對應的文件不存在! queueName=" + queue.getName());
        }
        // 2. 把 Message 對象, 進行序列化, 轉成二進制的字節(jié)數(shù)組.
        byte[] messageBinary = BinaryTool.toBytes(message);
        synchronized (queue) {
            // 3. 先獲取到當前的隊列數(shù)據(jù)文件的長度, 用這個來計算出該 Message 對象的 offsetBeg 和 offsetEnd
            // 把新的 Message 數(shù)據(jù), 寫入到隊列數(shù)據(jù)文件的末尾. 此時 Message 對象的 offsetBeg , 就是當前文件長度 + 4
            // offsetEnd 就是當前文件長度 + 4 + message 自身長度.
            File queueDataFile = new File(getQueueDataPath(queue.getName()));
            // 通過這個方法 queueDataFile.length() 就能獲取到文件的長度. 單位字節(jié).
            message.setOffsetBeg(queueDataFile.length() + 4);
            message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);
            // 4. 寫入消息到數(shù)據(jù)文件, 注意, 是追加寫入到數(shù)據(jù)文件末尾.
            try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {
                try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                    // 接下來要先寫當前消息的長度, 占據(jù) 4 個字節(jié)的~~
                    dataOutputStream.writeInt(messageBinary.length);
                    // 寫入消息本體
                    dataOutputStream.write(messageBinary);
                }
            }
            // 5. 更新消息統(tǒng)計文件
            Stat stat = readStat(queue.getName());
            stat.totalCount += 1;
            stat.validCount += 1;
            writeStat(queue.getName(), stat);
        }
    }

? 考慮線程安全, 按照隊列維度進?加鎖.

? 使? DataOutputStream 進??進制寫操作. ?原? OutputStream 要?便.

? 需要記錄 Message 對象在?件中的偏移量. 后續(xù)的刪除操作依賴這個偏移量定位到消息. offsetBeg 是原有?件??的基礎上, 再 + 4. 4 個字節(jié)是存放消息??的空間. (參考上?的圖). ? 寫完消息, 要同時更新統(tǒng)計信息

創(chuàng)建 common.MqException , 作為?定義異常類. 后續(xù)業(yè)務上出現(xiàn)問題, 都統(tǒng)?拋出這個異常.?

public class MqException extends Exception {
    public MqException(String message) {
        super(message);
    }
}

?7)實現(xiàn)刪除消息

此處的刪除只是 "邏輯刪除", 即把 Message 類中的 isValid 字段設置為 0.

這樣刪除速度?較快. 實際的徹底刪除, 則通過我們??實現(xiàn)的 GC 來解決.

    // 這個是刪除消息的方法.
    // 這里的刪除是邏輯刪除, 也就是把硬盤上存儲的這個數(shù)據(jù)里面的那個 isValid 屬性, 設置成 0
    // 1. 先把文件中的這一段數(shù)據(jù), 讀出來, 還原回 Message 對象;
    // 2. 把 isValid 改成 0;
    // 3. 把上述數(shù)據(jù)重新寫回到文件.
    // 此處這個參數(shù)中的 message 對象, 必須得包含有效的 offsetBeg 和 offsetEnd
    public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {
        synchronized (queue) {
            try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {
                // 1. 先從文件中讀取對應的 Message 數(shù)據(jù).
                byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
                randomAccessFile.seek(message.getOffsetBeg());
                randomAccessFile.read(bufferSrc);
                // 2. 把當前讀出來的二進制數(shù)據(jù), 轉換回成 Message 對象
                Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
                // 3. 把 isValid 設置為無效.
                diskMessage.setIsValid((byte) 0x0);
                // 此處不需要給參數(shù)的這個 message 的 isValid 設為 0, 因為這個參數(shù)代表的是內(nèi)存中管理的 Message 對象
                // 而這個對象馬上也要被從內(nèi)存中銷毀了.
                // 4. 重新寫入文件
                byte[] bufferDest = BinaryTool.toBytes(diskMessage);
                // 雖然上面已經(jīng) seek 過了, 但是上面 seek 完了之后, 進行了讀操作, 這一讀, 就導致, 文件光標往后移動, 移動到
                // 下一個消息的位置了. 因此要想讓接下來的寫入, 能夠剛好寫回到之前的位置, 就需要重新調(diào)整文件光標.
                randomAccessFile.seek(message.getOffsetBeg());
                randomAccessFile.write(bufferDest);
                // 通過上述這通折騰, 對于文件來說, 只是有一個字節(jié)發(fā)生改變而已了~~
            }
            // 不要忘了, 更新統(tǒng)計文件!! 把一個消息設為無效了, 此時有效消息個數(shù)就需要 - 1
            Stat stat = readStat(queue.getName());
            if (stat.validCount > 0) {
                stat.validCount -= 1;
            }
            writeStat(queue.getName(), stat);
        }
    }

?8)實現(xiàn)消息加載

   // 使用這個方法, 從文件中, 讀取出所有的消息內(nèi)容, 加載到內(nèi)存中(具體來說是放到一個鏈表里)
    // 這個方法, 準備在程序啟動的時候, 進行調(diào)用.
    // 這里使用一個 LinkedList, 主要目的是為了后續(xù)進行頭刪操作.
    // 這個方法的參數(shù), 只是一個 queueName 而不是 MSGQueue 對象. 因為這個方法不需要加鎖, 只使用 queueName 就夠了.
    // 由于該方法是在程序啟動時調(diào)用, 此時服務器還不能處理請求呢~~ 不涉及多線程操作文件.
    public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
        LinkedList<Message> messages = new LinkedList<>();
        try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {
            try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {
                // 這個變量記錄當前文件光標.
                long currentOffset = 0;
                // 一個文件中包含了很多消息, 此處勢必要循環(huán)讀取.
                while (true) {
                    // 1. 讀取當前消息的長度, 這里的 readInt 可能會讀到文件的末尾(EOF)
                    //    readInt 方法, 讀到文件末尾, 會拋出 EOFException 異常. 這一點和之前的很多流對象不太一樣.
                    int messageSize = dataInputStream.readInt();
                    // 2. 按照這個長度, 讀取消息內(nèi)容
                    byte[] buffer = new byte[messageSize];
                    int actualSize = dataInputStream.read(buffer);
                    if (messageSize != actualSize) {
                        // 如果不匹配, 說明文件有問題, 格式錯亂了!!
                        throw new MqException("[MessageFileManager] 文件格式錯誤! queueName=" + queueName);
                    }
                    // 3. 把這個讀到的二進制數(shù)據(jù), 反序列化回 Message 對象
                    Message message = (Message) BinaryTool.fromBytes(buffer);
                    // 4. 判定一下看看這個消息對象, 是不是無效對象.
                    if (message.getIsValid() != 0x1) {
                        // 無效數(shù)據(jù), 直接跳過.
                        // 雖然消息是無效數(shù)據(jù), 但是 offset 不要忘記更新.
                        currentOffset += (4 + messageSize);
                        continue;
                    }
                    // 5. 有效數(shù)據(jù), 則需要把這個 Message 對象加入到鏈表中. 加入之前還需要填寫 offsetBeg 和 offsetEnd
                    //    進行計算 offset 的時候, 需要知道當前文件光標的位置的. 由于當下使用的 DataInputStream 并不方便直接獲取到文件光標位置
                    //    因此就需要手動計算下文件光標.
                    message.setOffsetBeg(currentOffset + 4);
                    message.setOffsetEnd(currentOffset + 4 + messageSize);
                    currentOffset += (4 + messageSize);
                    messages.add(message);
                }
            } catch (EOFException e) {
                // 這個 catch 并非真是處理 "異常", 而是處理 "正常" 的業(yè)務邏輯. 文件讀到末尾, 會被 readInt 拋出該異常.
                // 這個 catch 語句中也不需要做啥特殊的事情
                System.out.println("[MessageFileManager] 恢復 Message 數(shù)據(jù)完成!");
            }
        }
        return messages;
    }

?9)實現(xiàn)垃圾回收(GC)

?上述刪除操作, 只是把消息在?件上標記成了?效. 并沒有騰出硬盤空間. 最終?件??可能會越積越 多. 因此需要定期的進?批量清除. 此處使?類似于復制算法. 當總消息數(shù)超過 2000, 并且有效消息數(shù)?少于 50% 的時候, 就觸發(fā) GC. GC 的時候會把所有有效消息加載出來, 寫?到?個新的消息?件中, 使?新?件, 代替舊?件即可.

    // 檢查當前是否要針對該隊列的消息數(shù)據(jù)文件進行 GC
    public boolean checkGC(String queueName) {
        // 判定是否要 GC, 是根據(jù)總消息數(shù)和有效消息數(shù). 這兩個值都是在 消息統(tǒng)計文件 中的.
        Stat stat = readStat(queueName);
        if (stat.totalCount > 2000 && (double)stat.validCount / (double)stat.totalCount < 0.5) {
            return true;
        }
        return false;
    }

    private String getQueueDataNewPath(String queueName) {
        return getQueueDir(queueName) + "/queue_data_new.txt";
    }

    // 通過這個方法, 真正執(zhí)行消息數(shù)據(jù)文件的垃圾回收操作.
    // 使用復制算法來完成.
    // 創(chuàng)建一個新的文件, 名字就是 queue_data_new.txt
    // 把之前消息數(shù)據(jù)文件中的有效消息都讀出來, 寫到新的文件中.
    // 刪除舊的文件, 再把新的文件改名回 queue_data.txt
    // 同時要記得更新消息統(tǒng)計文件.
    public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
        // 進行 gc 的時候, 是針對消息數(shù)據(jù)文件進行大洗牌. 在這個過程中, 其他線程不能針對該隊列的消息文件做任何修改.
        synchronized (queue) {
            // 由于 gc 操作可能比較耗時, 此處統(tǒng)計一下執(zhí)行消耗的時間.
            long gcBeg = System.currentTimeMillis();

            // 1. 創(chuàng)建一個新的文件
            File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
            if (queueDataNewFile.exists()) {
                // 正常情況下, 這個文件不應該存在. 如果存在, 就是意外~~ 說明上次 gc 了一半, 程序意外崩潰了.
                throw new MqException("[MessageFileManager] gc 時發(fā)現(xiàn)該隊列的 queue_data_new 已經(jīng)存在! queueName=" + queue.getName());
            }
            boolean ok = queueDataNewFile.createNewFile();
            if (!ok) {
                throw new MqException("[MessageFileManager] 創(chuàng)建文件失敗! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());
            }

            // 2. 從舊的文件中, 讀取出所有的有效消息對象了. (這個邏輯直接調(diào)用上述方法即可, 不必重新寫了)
            LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());

            // 3. 把有效消息, 寫入到新的文件中.
            try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {
                try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                    for (Message message : messages) {
                        byte[] buffer = BinaryTool.toBytes(message);
                        // 先寫四個字節(jié)消息的長度
                        dataOutputStream.writeInt(buffer.length);
                        dataOutputStream.write(buffer);
                    }
                }
            }

            // 4. 刪除舊的數(shù)據(jù)文件, 并且把新的文件進行重命名
            File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
            ok = queueDataOldFile.delete();
            if (!ok) {
                throw new MqException("[MessageFileManager] 刪除舊的數(shù)據(jù)文件失敗! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
            }
            // 把 queue_data_new.txt => queue_data.txt
            ok = queueDataNewFile.renameTo(queueDataOldFile);
            if (!ok) {
                throw new MqException("[MessageFileManager] 文件重命名失敗! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()
                        + ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
            }

            // 5. 更新統(tǒng)計文件
            Stat stat = readStat(queue.getName());
            stat.totalCount = messages.size();
            stat.validCount = messages.size();
            writeStat(queue.getName(), stat);

            long gcEnd = System.currentTimeMillis();
            System.out.println("[MessageFileManager] gc 執(zhí)行完畢! queueName=" + queue.getName() + ", time="
                    + (gcEnd - gcBeg) + "ms");
        }
    }

10)測試 MessageFileManager?

MessageFileManagerTests

@SpringBootTest
public class MessageFileManagerTests {
    private MessageFileManager messageFileManager = new MessageFileManager();

    private static final String queueName1 = "testQueue1";
    private static final String queueName2 = "testQueue2";

    // 這個方法是每個用例執(zhí)行之前的準備工作
    @BeforeEach
    public void setUp() throws IOException {
        // 準備階段, 創(chuàng)建出兩個隊列, 以備后用
        messageFileManager.createQueueFiles(queueName1);
        messageFileManager.createQueueFiles(queueName2);
    }

    // 這個方法就是每個用例執(zhí)行完畢之后的收尾工作
    @AfterEach
    public void tearDown() throws IOException {
        // 收尾階段, 就把剛才的隊列給干掉.
        messageFileManager.destroyQueueFiles(queueName1);
        messageFileManager.destroyQueueFiles(queueName2);
    }

    @Test
    public void testCreateFiles() {
        // 創(chuàng)建隊列文件已經(jīng)在上面 setUp 階段執(zhí)行過了. 此處主要是驗證看看文件是否存在.
        File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");
        Assertions.assertEquals(true, queueDataFile1.isFile());
        File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");
        Assertions.assertEquals(true, queueStatFile1.isFile());

        File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");
        Assertions.assertEquals(true, queueDataFile2.isFile());
        File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");
        Assertions.assertEquals(true, queueStatFile2.isFile());
    }

    @Test
    public void testReadWriteStat() {
        MessageFileManager.Stat stat = new MessageFileManager.Stat();
        stat.totalCount = 100;
        stat.validCount = 50;
        // 此處就需要使用反射的方式, 來調(diào)用 writeStat 和 readStat 了.
        // Java 原生的反射 API 其實非常難用~~
        // 此處使用 Spring 幫我們封裝好的 反射 的工具類.
        ReflectionTestUtils.invokeMethod(messageFileManager, "writeStat", queueName1, stat);

        // 寫入完畢之后, 再調(diào)用一下讀取, 驗證讀取的結果和寫入的數(shù)據(jù)是一致的.
        MessageFileManager.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);
        Assertions.assertEquals(100, newStat.totalCount);
        Assertions.assertEquals(50, newStat.validCount);
        System.out.println("測試 readStat 和 writeStat 完成!");
    }

    private MSGQueue createTestQueue(String queueName) {
        MSGQueue queue = new MSGQueue();
        queue.setName(queueName);
        queue.setDurable(true);
        return queue;
    }

    private Message createTestMessage(String content) {
        Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());
        return message;
    }

    @Test
    public void testSendMessage() throws IOException, MqException, ClassNotFoundException {
        // 構造出消息, 并且構造出隊列.
        Message message = createTestMessage("testMessage");
        // 此處創(chuàng)建的 queue 對象的 name, 不能隨便寫, 只能用 queueName1 和 queueName2. 需要保證這個隊列對象
        // 對應的目錄和文件啥的都存在才行.
        MSGQueue queue = createTestQueue(queueName1);

        // 調(diào)用發(fā)送消息方法
        messageFileManager.sendMessage(queue, message);

        // 檢查 stat 文件.
        MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);
        Assertions.assertEquals(1, stat.totalCount);
        Assertions.assertEquals(1, stat.validCount);

        // 檢查 data 文件
        LinkedList<Message> messages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(1, messages.size());
        Message curMessage = messages.get(0);
        Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());
        Assertions.assertEquals(message.getRoutingKey(), curMessage.getRoutingKey());
        Assertions.assertEquals(message.getDeliverMode(), curMessage.getDeliverMode());
        // 比較兩個字節(jié)數(shù)組的內(nèi)容是否相同, 不能直接使用 assertEquals 了.
        Assertions.assertArrayEquals(message.getBody(), curMessage.getBody());

        System.out.println("message: " + curMessage);
    }

    @Test
    public void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {
        // 往隊列中插入 100 條消息, 然后驗證看看這 100 條消息從文件中讀取之后, 是否和最初是一致的.
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 100; i++) {
            Message message = createTestMessage("testMessage" + i);
            messageFileManager.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 讀取所有消息
        LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(expectedMessages.size(), actualMessages.size());
        for (int i = 0; i < expectedMessages.size(); i++) {
            Message expectedMessage = expectedMessages.get(i);
            Message actualMessage = actualMessages.get(i);
            System.out.println("[" + i + "] actualMessage=" + actualMessage);

            Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
            Assertions.assertEquals(0x1, actualMessage.getIsValid());
        }
    }

    @Test
    public void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {
        // 創(chuàng)建隊列, 寫入 10 個消息. 刪除其中的幾個消息. 再把所有消息讀取出來, 判定是否符合預期.
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 10; i++) {
            Message message = createTestMessage("testMessage" + i);
            messageFileManager.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 刪除其中的三個消息
        messageFileManager.deleteMessage(queue, expectedMessages.get(7));
        messageFileManager.deleteMessage(queue, expectedMessages.get(8));
        messageFileManager.deleteMessage(queue, expectedMessages.get(9));

        // 對比這里的內(nèi)容是否正確.
        LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(7, actualMessages.size());
        for (int i = 0; i < actualMessages.size(); i++) {
            Message expectedMessage = expectedMessages.get(i);
            Message actualMessage = actualMessages.get(i);
            System.out.println("[" + i + "] actualMessage=" + actualMessage);

            Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
            Assertions.assertEquals(0x1, actualMessage.getIsValid());
        }
    }

    @Test
    public void testGC() throws IOException, MqException, ClassNotFoundException {
        // 先往隊列中寫 100 個消息. 獲取到文件大小.
        // 再把 100 個消息中的一半, 都給刪除掉(比如把下標為偶數(shù)的消息都刪除)
        // 再手動調(diào)用 gc 方法, 檢測得到的新的文件的大小是否比之前縮小了.
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 100; i++) {
            Message message = createTestMessage("testMessage" + i);
            messageFileManager.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 獲取 gc 前的文件大小
        File beforeGCFile = new File("./data/" + queueName1 + "/queue_data.txt");
        long beforeGCLength = beforeGCFile.length();

        // 刪除偶數(shù)下標的消息
        for (int i = 0; i < 100; i += 2) {
            messageFileManager.deleteMessage(queue, expectedMessages.get(i));
        }

        // 手動調(diào)用 gc
        messageFileManager.gc(queue);

        // 重新讀取文件, 驗證新的文件的內(nèi)容是不是和之前的內(nèi)容匹配
        LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(50, actualMessages.size());
        for (int i = 0; i < actualMessages.size(); i++) {
            // 把之前消息偶數(shù)下標的刪了, 剩下的就是奇數(shù)下標的元素了.
            // actual 中的 0 對應 expected 的 1
            // actual 中的 1 對應 expected 的 3
            // actual 中的 2 對應 expected 的 5
            // actual 中的 i 對應 expected 的 2 * i + 1
            Message expectedMessage = expectedMessages.get(2 * i + 1);
            Message actualMessage = actualMessages.get(i);

            Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
            Assertions.assertEquals(0x1, actualMessage.getIsValid());
        }
        // 獲取新的文件的大小
        File afterGCFile = new File("./data/" + queueName1 + "/queue_data.txt");
        long afterGCLength = afterGCFile.length();
        System.out.println("before: " + beforeGCLength);
        System.out.println("after: " + afterGCLength);
        Assertions.assertTrue(beforeGCLength > afterGCLength);
    }
}

?七. 整合數(shù)據(jù)庫和?件

上述代碼中, 使?數(shù)據(jù)庫存儲了 Exchange, Queue, Binding, 使??本?件存儲了 Message. 接下來我們把兩個部分整合起來, 統(tǒng)?進?管理.

創(chuàng)建 DiskDataCenter 使? DiskDataCenter 來綜合管理數(shù)據(jù)庫和?本?件的內(nèi)容. DiskDataCenter 會持有 DataBaseManager 和 MessageFileManager 對象.

public class DiskDataCenter {
    // 這個實例用來管理數(shù)據(jù)庫中的數(shù)據(jù)
    private DataBaseManager dataBaseManager = new DataBaseManager();
    // 這個實例用來管理數(shù)據(jù)文件中的數(shù)據(jù)
    private MessageFileManager messageFileManager = new MessageFileManager();

    public void init() {
        // 針對上述兩個實例進行初始化.
        dataBaseManager.init();
        // 當前 messageFileManager.init 是空的方法, 只是先列在這里, 一旦后續(xù)需要擴展, 就在這里進行初始化即可.
        messageFileManager.init();
    }

    // 封裝交換機操作
    public void insertExchange(Exchange exchange) {
        dataBaseManager.insertExchange(exchange);
    }

    public void deleteExchange(String exchangeName) {
        dataBaseManager.deleteExchange(exchangeName);
    }

    public List<Exchange> selectAllExchanges() {
        return dataBaseManager.selectAllExchanges();
    }

    // 封裝隊列操作
    public void insertQueue(MSGQueue queue) throws IOException {
        dataBaseManager.insertQueue(queue);
        // 創(chuàng)建隊列的同時, 不僅僅是把隊列對象寫到數(shù)據(jù)庫中, 還需要創(chuàng)建出對應的目錄和文件
        messageFileManager.createQueueFiles(queue.getName());
    }

    public void deleteQueue(String queueName) throws IOException {
        dataBaseManager.deleteQueue(queueName);
        // 刪除隊列的同時, 不僅僅是把隊列從數(shù)據(jù)庫中刪除, 還需要刪除對應的目錄和文件
        messageFileManager.destroyQueueFiles(queueName);
    }

    public List<MSGQueue> selectAllQueues() {
        return dataBaseManager.selectAllQueues();
    }

    // 封裝綁定操作
    public void insertBinding(Binding binding) {
        dataBaseManager.insertBinding(binding);
    }

    public void deleteBinding(Binding binding) {
        dataBaseManager.deleteBinding(binding);
    }

    public List<Binding> selectAllBindings() {
        return dataBaseManager.selectAllBindings();
    }

    // 封裝消息操作
    public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {
        messageFileManager.sendMessage(queue, message);
    }

    public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {
        messageFileManager.deleteMessage(queue, message);
        if (messageFileManager.checkGC(queue.getName())) {
            messageFileManager.gc(queue);
        }
    }

    public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
        return messageFileManager.loadAllMessageFromQueue(queueName);
    }
}

?八. 內(nèi)存數(shù)據(jù)結構設計

?硬盤上存儲數(shù)據(jù), 只是為了實現(xiàn) "持久化" 這樣的效果. 但是實際的消息存儲/轉發(fā), 還是主要靠內(nèi)存的結 構. 對于 MQ 來說, 內(nèi)存部分是更關鍵的, 內(nèi)存速度更快, 可以達成更?的并發(fā).

?創(chuàng)建 MemoryDataCenter

mqserver.datacenter.MemoryDataCenter?

public class MemoryDataCenter {
    // key 是 exchangeName, value 是 Exchange 對象
    private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
    // key 是 queueName, value 是 MSGQueue 對象
    private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
    // 第一個 key 是 exchangeName, 第二個 key 是 queueName
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
    // key 是 messageId, value 是 Message 對象
    private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
    // key 是 queueName, value 是一個 Message 的鏈表
    private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
    // 第一個 key 是 queueName, 第二個 key 是 messageId
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
    public void init(){
    }
}

? 使?四個哈希表, 管理 Exchange, Queue, Binding, Message.

? 使??個哈希表 + 鏈表管理 隊列 -> 消息 之間的關系.

? 使??個哈希表 + 哈希表管理所有的未被確認的消息.

為了保證消息被正確消費了, 會使?兩種?式進?確認. ?動 ACK 和 ?動 ACK. 其中?動 ACK 是指當消息被消費之后, 就會?即被銷毀釋放. 其中?動 ACK 是指當消息被消費之后, 由消費者主動調(diào)??個 basicAck ?法, 進?主動確認. 服務器 收到這個確認之后, 才能真正銷毀消息. 此處的 "未確認消息" 就是指在?動 ACK 模式下, 該消息還沒有被調(diào)? basicAck. 此時消息不能刪除, 但是要和其他未消費的消息區(qū)分開. 于是另搞了個結構. 當后續(xù) basicAck 到了, 就可以刪除消息了.?

?1.封裝 Exchange 、Queue 、Binding 、Message 方法

   public void insertExchange(Exchange exchange) {
        exchangeMap.put(exchange.getName(), exchange);
        System.out.println("[MemoryDataCenter] 新交換機添加成功! exchangeName=" + exchange.getName());
    }

    public Exchange getExchange(String exchangeName) {
        return exchangeMap.get(exchangeName);
    }

    public void deleteExchange(String exchangeName) {
        exchangeMap.remove(exchangeName);
        System.out.println("[MemoryDataCenter] 交換機刪除成功! exchangeName=" + exchangeName);
    }

    public void insertQueue(MSGQueue queue) {
        queueMap.put(queue.getName(), queue);
        System.out.println("[MemoryDataCenter] 新隊列添加成功! queueName=" + queue.getName());
    }

    public MSGQueue getQueue(String queueName) {
        return queueMap.get(queueName);
    }

    public void deleteQueue(String queueName) {
        queueMap.remove(queueName);
        System.out.println("[MemoryDataCenter] 隊列刪除成功! queueName=" + queueName);
    }

    public void insertBinding(Binding binding) throws MqException {
//        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
//        if (bindingMap == null) {
//            bindingMap = new ConcurrentHashMap<>();
//            bindingsMap.put(binding.getExchangeName(), bindingMap);
//        }
        // 先使用 exchangeName 查一下, 對應的哈希表是否存在. 不存在就創(chuàng)建一個.
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
                k -> new ConcurrentHashMap<>());

        synchronized (bindingMap) {
            // 再根據(jù) queueName 查一下. 如果已經(jīng)存在, 就拋出異常. 不存在才能插入.
            if (bindingMap.get(binding.getQueueName()) != null) {
                throw new MqException("[MemoryDataCenter] 綁定已經(jīng)存在! exchangeName=" + binding.getExchangeName() +
                        ", queueName=" + binding.getQueueName());
            }
            bindingMap.put(binding.getQueueName(), binding);
        }
        System.out.println("[MemoryDataCenter] 新綁定添加成功! exchangeName=" + binding.getExchangeName()
                + ", queueName=" + binding.getQueueName());
    }

    // 獲取綁定, 寫兩個版本:
    // 1. 根據(jù) exchangeName 和 queueName 確定唯一一個 Binding
    // 2. 根據(jù) exchangeName 獲取到所有的 Binding
    public Binding getBinding(String exchangeName, String queueName) {
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
        if (bindingMap == null) {
            return null;
        }
        return bindingMap.get(queueName);
    }

    public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {
        return bindingsMap.get(exchangeName);
    }

    public void deleteBinding(Binding binding) throws MqException {
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
        if (bindingMap == null) {
            // 該交換機沒有綁定任何隊列. 報錯.
            throw new MqException("[MemoryDataCenter] 綁定不存在! exchangeName=" + binding.getExchangeName()
                    + ", queueName=" + binding.getQueueName());
        }
        bindingMap.remove(binding.getQueueName());
        System.out.println("[MemoryDataCenter] 綁定刪除成功! exchangeName=" + binding.getExchangeName()
                + ", queueName=" + binding.getQueueName());
    }

    // 添加消息
    public void addMessage(Message message) {
        messageMap.put(message.getMessageId(), message);
        System.out.println("[MemoryDataCenter] 新消息添加成功! messageId=" + message.getMessageId());
    }

    // 根據(jù) id 查詢消息
    public Message getMessage(String messageId) {
        return messageMap.get(messageId);
    }

    // 根據(jù) id 刪除消息
    public void removeMessage(String messageId) {
        messageMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息被移除! messageId=" + messageId);
    }

    // 發(fā)送消息到指定隊列
    public void sendMessage(MSGQueue queue, Message message) {
        // 把消息放到對應的隊列數(shù)據(jù)結構中.
        // 先根據(jù)隊列的名字, 找到該隊列對應的消息鏈表.
        LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());
        // 再把數(shù)據(jù)加到 messages 里面
        synchronized (messages) {
            messages.add(message);
        }
        // 在這里把該消息也往消息中心中插入一下. 假設如果 message 已經(jīng)在消息中心存在, 重復插入也沒關系.
        // 主要就是相同 messageId, 對應的 message 的內(nèi)容一定是一樣的. (服務器代碼不會對 Message 內(nèi)容做修改 basicProperties 和 body)
        addMessage(message);
        System.out.println("[MemoryDataCenter] 消息被投遞到隊列中! messageId=" + message.getMessageId());
    }

    // 從隊列中取消息
    public Message pollMessage(String queueName) {
        // 根據(jù)隊列名, 查找一下, 對應的隊列的消息鏈表.
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if (messages == null) {
            return null;
        }
        synchronized (messages) {
            // 如果沒找到, 說明隊列中沒有任何消息.
            if (messages.size() == 0) {
                return null;
            }
            // 鏈表中有元素, 就進行頭刪.
            Message currentMessage = messages.remove(0);
            System.out.println("[MemoryDataCenter] 消息從隊列中取出! messageId=" + currentMessage.getMessageId());
            return currentMessage;
        }
    }

    // 獲取指定隊列中消息的個數(shù)
    public int getMessageCount(String queueName) {
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if (messages == null) {
            // 隊列中沒有消息
            return 0;
        }
        synchronized (messages) {
            return messages.size();
        }
    }

2.針對未確認的消息的處理?

    // 添加未確認的消息
    public void addMessageWaitAck(String queueName, Message message) {
        ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,
                k -> new ConcurrentHashMap<>());
        messageHashMap.put(message.getMessageId(), message);
        System.out.println("[MemoryDataCenter] 消息進入待確認隊列! messageId=" + message.getMessageId());
    }

    // 刪除未確認的消息(消息已經(jīng)確認了)
    public void removeMessageWaitAck(String queueName, String messageId) {
        ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
        if (messageHashMap == null) {
            return;
        }
        messageHashMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息從待確認隊列刪除! messageId=" + messageId);
    }

3.實現(xiàn)重啟后恢復內(nèi)存?

    // 這個方法就是從硬盤上讀取數(shù)據(jù), 把硬盤中之前持久化存儲的各個維度的數(shù)據(jù)都恢復到內(nèi)存中.
    public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
        // 0. 清空之前的所有數(shù)據(jù)
        exchangeMap.clear();
        queueMap.clear();
        bindingsMap.clear();
        messageMap.clear();
        queueMessageMap.clear();
        // 1. 恢復所有的交換機數(shù)據(jù)
        List<Exchange> exchanges = diskDataCenter.selectAllExchanges();
        for (Exchange exchange : exchanges) {
            exchangeMap.put(exchange.getName(), exchange);
        }
        // 2. 恢復所有的隊列數(shù)據(jù)
        List<MSGQueue> queues = diskDataCenter.selectAllQueues();
        for (MSGQueue queue : queues) {
            queueMap.put(queue.getName(), queue);
        }
        // 3. 恢復所有的綁定數(shù)據(jù)
        List<Binding> bindings = diskDataCenter.selectAllBindings();
        for (Binding binding : bindings) {
            ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
                    k -> new ConcurrentHashMap<>());
            bindingMap.put(binding.getQueueName(), binding);
        }
        // 4. 恢復所有的消息數(shù)據(jù)
        //    遍歷所有的隊列, 根據(jù)每個隊列的名字, 獲取到所有的消息.
        for (MSGQueue queue : queues) {
            LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());
            queueMessageMap.put(queue.getName(), messages);
            for (Message message : messages) {
                messageMap.put(message.getMessageId(), message);
            }
        }
        // 注意!! 針對 "未確認的消息" 這部分內(nèi)存中的數(shù)據(jù), 不需要從硬盤恢復. 之前考慮硬盤存儲的時候, 也沒設定這一塊.
        // 一旦在等待 ack 的過程中, 服務器重啟了, 此時這些 "未被確認的消息", 就恢復成 "未被取走的消息" .
        // 這個消息在硬盤上存儲的時候, 就是當做 "未被取走"
    }

4.測試 MemoryDataCenter?

創(chuàng)建 MemoryDataCenterTests

@SpringBootTest
public class MemoryDataCenterTests {
    private MemoryDataCenter memoryDataCenter = null;

    @BeforeEach
    public void setUp() {
        memoryDataCenter = new MemoryDataCenter();
    }

    @AfterEach
    public void tearDown() {
        memoryDataCenter = null;
    }

    // 創(chuàng)建一個測試交換機
    private Exchange createTestExchange(String exchangeName) {
        Exchange exchange = new Exchange();
        exchange.setName(exchangeName);
        exchange.setType(ExchangeType.DIRECT);
        exchange.setDurable(true);
        return exchange;
    }

    // 創(chuàng)建一個測試隊列
    private MSGQueue createTestQueue(String queueName) {
        MSGQueue queue = new MSGQueue();
        queue.setName(queueName);
        queue.setDurable(true);
        return queue;
    }

    // 針對交換機進行測試
    @Test
    public void testExchange() {
        // 1. 先構造一個交換機并插入.
        Exchange expectedExchange = createTestExchange("testExchange");
        memoryDataCenter.insertExchange(expectedExchange);
        // 2. 查詢出這個交換機, 比較結果是否一致. 此處直接比較這倆引用指向同一個對象.
        Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertEquals(expectedExchange, actualExchange);
        // 3. 刪除這個交換機
        memoryDataCenter.deleteExchange("testExchange");
        // 4. 再查一次, 看是否就查不到了
        actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertNull(actualExchange);
    }

    // 針對隊列進行測試
    @Test
    public void testQueue() {
        // 1. 構造一個隊列, 并插入
        MSGQueue expectedQueue = createTestQueue("testQueue");
        memoryDataCenter.insertQueue(expectedQueue);
        // 2. 查詢這個隊列, 并比較
        MSGQueue actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertEquals(expectedQueue, actualQueue);
        // 3. 刪除這個隊列
        memoryDataCenter.deleteQueue("testQueue");
        // 4. 再次查詢隊列, 看是否能查到
        actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertNull(actualQueue);
    }

    // 針對綁定進行測試
    @Test
    public void testBinding() throws MqException {
        Binding expectedBinding = new Binding();
        expectedBinding.setExchangeName("testExchange");
        expectedBinding.setQueueName("testQueue");
        expectedBinding.setBindingKey("testBindingKey");
        memoryDataCenter.insertBinding(expectedBinding);

        Binding actualBinding = memoryDataCenter.getBinding("testExchange", "testQueue");
        Assertions.assertEquals(expectedBinding, actualBinding);

        ConcurrentHashMap<String, Binding> bindingMap = memoryDataCenter.getBindings("testExchange");
        Assertions.assertEquals(1, bindingMap.size());
        Assertions.assertEquals(expectedBinding, bindingMap.get("testQueue"));

        memoryDataCenter.deleteBinding(expectedBinding);

        actualBinding = memoryDataCenter.getBinding("testExchange", "testQueue");
        Assertions.assertNull(actualBinding);
    }

    private Message createTestMessage(String content) {
        Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());
        return message;
    }

    @Test
    public void testMessage() {
        Message expectedMessage = createTestMessage("testMessage");
        memoryDataCenter.addMessage(expectedMessage);

        Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
        Assertions.assertEquals(expectedMessage, actualMessage);

        memoryDataCenter.removeMessage(expectedMessage.getMessageId());

        actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
        Assertions.assertNull(actualMessage);
    }

    @Test
    public void testSendMessage() {
        // 1. 創(chuàng)建一個隊列, 創(chuàng)建 10 條消息, 把這些消息都插入隊列中.
        MSGQueue queue = createTestQueue("testQueue");
        List<Message> expectedMessages = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message message = createTestMessage("testMessage" + i);
            memoryDataCenter.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 2. 從隊列中取出這些消息.
        List<Message> actualMessages = new ArrayList<>();
        while (true) {
            Message message = memoryDataCenter.pollMessage("testQueue");
            if (message == null) {
                break;
            }
            actualMessages.add(message);
        }

        // 3. 比較取出的消息和之前的消息是否一致.
        Assertions.assertEquals(expectedMessages.size(), actualMessages.size());
        for (int i = 0; i < expectedMessages.size(); i++) {
            Assertions.assertEquals(expectedMessages.get(i), actualMessages.get(i));
        }
    }

    @Test
    public void testMessageWaitAck() {
        Message expectedMessage = createTestMessage("expectedMessage");
        memoryDataCenter.addMessageWaitAck("testQueue", expectedMessage);

        Message actualMessage = memoryDataCenter.getMessageWaitAck("testQueue", expectedMessage.getMessageId());
        Assertions.assertEquals(expectedMessage, actualMessage);

        memoryDataCenter.removeMessageWaitAck("testQueue", expectedMessage.getMessageId());
        actualMessage = memoryDataCenter.getMessageWaitAck("testQueue", expectedMessage.getMessageId());
        Assertions.assertNull(actualMessage);
    }

    @Test
    public void testRecovery() throws IOException, MqException, ClassNotFoundException {
        // 由于后續(xù)需要進行數(shù)據(jù)庫操作, 依賴 MyBatis. 就需要先啟動 SpringApplication, 這樣才能進行后續(xù)的數(shù)據(jù)庫操作.
        MqApplication.context = SpringApplication.run(MqApplication.class);

        // 1. 在硬盤上構造好數(shù)據(jù)
        DiskDataCenter diskDataCenter = new DiskDataCenter();
        diskDataCenter.init();

        // 構造交換機
        Exchange expectedExchange = createTestExchange("testExchange");
        diskDataCenter.insertExchange(expectedExchange);

        // 構造隊列
        MSGQueue expectedQueue = createTestQueue("testQueue");
        diskDataCenter.insertQueue(expectedQueue);

        // 構造綁定
        Binding expectedBinding = new Binding();
        expectedBinding.setExchangeName("testExchange");
        expectedBinding.setQueueName("testQueue");
        expectedBinding.setBindingKey("testBindingKey");
        diskDataCenter.insertBinding(expectedBinding);

        // 構造消息
        Message expectedMessage = createTestMessage("testContent");
        diskDataCenter.sendMessage(expectedQueue, expectedMessage);

        // 2. 執(zhí)行恢復操作
        memoryDataCenter.recovery(diskDataCenter);

        // 3. 對比結果
        Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertEquals(expectedExchange.getName(), actualExchange.getName());
        Assertions.assertEquals(expectedExchange.getType(), actualExchange.getType());
        Assertions.assertEquals(expectedExchange.isDurable(), actualExchange.isDurable());


        MSGQueue actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertEquals(expectedQueue.getName(), actualQueue.getName());
        Assertions.assertEquals(expectedQueue.isDurable(), actualQueue.isDurable());


        Binding actualBinding = memoryDataCenter.getBinding("testExchange", "testQueue");
        Assertions.assertEquals(expectedBinding.getExchangeName(), actualBinding.getExchangeName());
        Assertions.assertEquals(expectedBinding.getQueueName(), actualBinding.getQueueName());
        Assertions.assertEquals(expectedBinding.getBindingKey(), actualBinding.getBindingKey());

        Message actualMessage = memoryDataCenter.pollMessage("testQueue");
        Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
        Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
        Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
        Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());

        // 4. 清理硬盤的數(shù)據(jù), 把整個 data 目錄里的內(nèi)容都刪掉(包含了 meta.db 和 隊列的目錄).
        MqApplication.context.close();
        File dataDir = new File("./data");
        FileUtils.deleteDirectory(dataDir);
    }
}

九. 虛擬主機設計?

?此, 內(nèi)存和硬盤的數(shù)據(jù)都已經(jīng)組織完成. 接下來使? "虛擬主機" 這個概念, 把這兩部分的數(shù)據(jù)也串起 來. 并且實現(xiàn)?些 MQ 的關鍵 API.

1.創(chuàng)建 VirtualHost?

public class VirtualHost {
    private String virtualHostName;
    private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();
    private DiskDataCenter diskDataCenter = new DiskDataCenter();
    private Router router = new Router();
    private ConsumerManager consumerManager = new ConsumerManager(this);
}

2.實現(xiàn)構造?法和 getter?

?構造?法中會針對 DiskDataCenter 和 MemoryDataCenter 進?初始化. 同時會把硬盤的數(shù)據(jù)恢復到內(nèi)存中.

   public String getVirtualHostName() {
        return virtualHostName;
    }

    public MemoryDataCenter getMemoryDataCenter() {
        return memoryDataCenter;
    }

    public DiskDataCenter getDiskDataCenter() {
        return diskDataCenter;
    }

    public VirtualHost(String name) {
        this.virtualHostName = name;

        // 對于 MemoryDataCenter 來說, 不需要額外的初始化操作的. 只要對象 new 出來就行了
        // 但是, 針對 DiskDataCenter 來說, 則需要進行初始化操作. 建庫建表和初始數(shù)據(jù)的設定.
        diskDataCenter.init();

        // 另外還需要針對硬盤的數(shù)據(jù), 進行恢復到內(nèi)存中.
        try {
            memoryDataCenter.recovery(diskDataCenter);
        } catch (IOException | MqException | ClassNotFoundException e) {
            e.printStackTrace();
            System.out.println("[VirtualHost] 恢復內(nèi)存數(shù)據(jù)失敗!");
        }
    }

3.創(chuàng)建交換機?

約定, 交換機/隊列的名字, 都加上 VirtualHostName 作為前綴. 這樣不同 VirtualHost 中就可以存在 同名的交換機或者隊列了.

exchangeDeclare 的語義是, 不存在就創(chuàng)建, 存在則直接返回. 因此不叫做 "exchangeCreate". 先寫硬盤, 后寫內(nèi)存. 因為寫硬盤失敗概率更?. 如果硬盤寫失敗了, 也就不必寫內(nèi)存了

   // 創(chuàng)建交換機
    // 如果交換機不存在, 就創(chuàng)建. 如果存在, 直接返回.
    // 返回值是 boolean. 創(chuàng)建成功, 返回 true. 失敗返回 false
    public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable) {
        // 把交換機的名字, 加上虛擬主機作為前綴.
        exchangeName = virtualHostName + exchangeName;
        try {
            synchronized (exchangeLocker) {
                // 1. 判定該交換機是否已經(jīng)存在. 直接通過內(nèi)存查詢.
                Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);
                if (existsExchange != null) {
                    // 該交換機已經(jīng)存在!
                    System.out.println("[VirtualHost] 交換機已經(jīng)存在! exchangeName=" + exchangeName);
                    return true;
                }
                // 2. 真正創(chuàng)建交換機. 先構造 Exchange 對象
                Exchange exchange = new Exchange();
                exchange.setName(exchangeName);
                exchange.setType(exchangeType);
                exchange.setDurable(durable);
                // 3. 把交換機對象寫入硬盤
                if (durable) {
                    diskDataCenter.insertExchange(exchange);
                }
                // 4. 把交換機對象寫入內(nèi)存
                memoryDataCenter.insertExchange(exchange);
                System.out.println("[VirtualHost] 交換機創(chuàng)建完成! exchangeName=" + exchangeName);
                // 上述邏輯, 先寫硬盤, 后寫內(nèi)存. 目的就是因為硬盤更容易寫失敗. 如果硬盤寫失敗了, 內(nèi)存就不寫了.
                // 要是先寫內(nèi)存, 內(nèi)存寫成功了, 硬盤寫失敗了, 還需要把內(nèi)存的數(shù)據(jù)給再刪掉. 就比較麻煩了.
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 交換機創(chuàng)建失敗! exchangeName=" + exchangeName);
            e.printStackTrace();
            return false;
        }
    }

4.刪除交換機?

    // 刪除交換機
    public boolean exchangeDelete(String exchangeName) {
        exchangeName = virtualHostName + exchangeName;
        try {
            synchronized (exchangeLocker) {
                // 1. 先找到對應的交換機.
                Exchange toDelete = memoryDataCenter.getExchange(exchangeName);
                if (toDelete == null) {
                    throw new MqException("[VirtualHost] 交換機不存在無法刪除!");
                }
                // 2. 刪除硬盤上的數(shù)據(jù)
                if (toDelete.isDurable()) {
                    diskDataCenter.deleteExchange(exchangeName);
                }
                // 3. 刪除內(nèi)存中的交換機數(shù)據(jù)
                memoryDataCenter.deleteExchange(exchangeName);
                System.out.println("[VirtualHost] 交換機刪除成功! exchangeName=" + exchangeName);
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 交換機刪除失敗! exchangeName=" + exchangeName);
            e.printStackTrace();
            return false;
        }
    }

5.創(chuàng)建隊列?

   // 創(chuàng)建隊列
    public boolean queueDeclare(String queueName, boolean durable) {
        // 把隊列的名字, 給拼接上虛擬主機的名字.
        queueName = virtualHostName + queueName;
        try {
            synchronized (queueLocker) {
                // 1. 判定隊列是否存在
                MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);
                if (existsQueue != null) {
                    System.out.println("[VirtualHost] 隊列已經(jīng)存在! queueName=" + queueName);
                    return true;
                }
                // 2. 創(chuàng)建隊列對象
                MSGQueue queue = new MSGQueue();
                queue.setName(queueName);
                queue.setDurable(durable);
                // 3. 寫硬盤
                if (durable) {
                    diskDataCenter.insertQueue(queue);
                }
                // 4. 寫內(nèi)存
                memoryDataCenter.insertQueue(queue);
                System.out.println("[VirtualHost] 隊列創(chuàng)建成功! queueName=" + queueName);
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 隊列創(chuàng)建失敗! queueName=" + queueName);
            e.printStackTrace();
            return false;
        }
    }

6.刪除隊列?

    // 刪除隊列
    public boolean queueDelete(String queueName) {
        queueName = virtualHostName + queueName;
        try {
            synchronized (queueLocker) {
                // 1. 根據(jù)隊列名字, 查詢下當前的隊列對象
                MSGQueue queue = memoryDataCenter.getQueue(queueName);
                if (queue == null) {
                    throw new MqException("[VirtualHost] 隊列不存在! 無法刪除! queueName=" + queueName);
                }
                // 2. 刪除硬盤數(shù)據(jù)
                if (queue.isDurable()) {
                    diskDataCenter.deleteQueue(queueName);
                }
                // 3. 刪除內(nèi)存數(shù)據(jù)
                memoryDataCenter.deleteQueue(queueName);
                System.out.println("[VirtualHost] 刪除隊列成功! queueName=" + queueName);
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 刪除隊列失敗! queueName=" + queueName);
            e.printStackTrace();
            return false;
        }
    }

?7.創(chuàng)建綁定

bindingKey 是進? topic 轉發(fā)時的?個關鍵概念. 使? router 類來檢測是否是合法的 bindingKey.

   public boolean queueBind(String queueName, String exchangeName, String bindingKey) {
        queueName = virtualHostName + queueName;
        exchangeName = virtualHostName + exchangeName;
        try {
            synchronized (exchangeLocker) {
                synchronized (queueLocker) {
                    // 1. 判定當前的綁定是否已經(jīng)存在了.
                    Binding existsBinding = memoryDataCenter.getBinding(exchangeName, queueName);
                    if (existsBinding != null) {
                        throw new MqException("[VirtualHost] binding 已經(jīng)存在! queueName=" + queueName
                                + ", exchangeName=" + exchangeName);
                    }
                    // 2. 驗證 bindingKey 是否合法.
                    if (!router.checkBindingKey(bindingKey)) {
                        throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey);
                    }
                    // 3. 創(chuàng)建 Binding 對象
                    Binding binding = new Binding();
                    binding.setExchangeName(exchangeName);
                    binding.setQueueName(queueName);
                    binding.setBindingKey(bindingKey);
                    // 4. 獲取一下對應的交換機和隊列. 如果交換機或者隊列不存在, 這樣的綁定也是無法創(chuàng)建的.
                    MSGQueue queue = memoryDataCenter.getQueue(queueName);
                    if (queue == null) {
                        throw new MqException("[VirtualHost] 隊列不存在! queueName=" + queueName);
                    }
                    Exchange exchange = memoryDataCenter.getExchange(exchangeName);
                    if (exchange == null) {
                        throw new MqException("[VirtualHost] 交換機不存在! exchangeName=" + exchangeName);
                    }
                    // 5. 先寫硬盤
                    if (queue.isDurable() && exchange.isDurable()) {
                        diskDataCenter.insertBinding(binding);
                    }
                    // 6. 寫入內(nèi)存
                    memoryDataCenter.insertBinding(binding);
                }
            }
            System.out.println("[VirtualHost] 綁定創(chuàng)建成功! exchangeName=" + exchangeName
                    + ", queueName=" + queueName);
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 綁定創(chuàng)建失敗! exchangeName=" + exchangeName
                    + ", queueName=" + queueName);
            e.printStackTrace();
            return false;
        }
    }

8.刪除綁定?

    public boolean queueUnbind(String queueName, String exchangeName) {
        queueName = virtualHostName + queueName;
        exchangeName = virtualHostName + exchangeName;
        try {
            synchronized (exchangeLocker) {
                synchronized (queueLocker) {
                    // 1. 獲取 binding 看是否已經(jīng)存在~
                    Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);
                    if (binding == null) {
                        throw new MqException("[VirtualHost] 刪除綁定失敗! 綁定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName);
                    }
                    // 2. 無論綁定是否持久化了, 都嘗試從硬盤刪一下. 就算不存在, 這個刪除也無副作用.
                    diskDataCenter.deleteBinding(binding);
                    // 3. 刪除內(nèi)存的數(shù)據(jù)
                    memoryDataCenter.deleteBinding(binding);
                    System.out.println("[VirtualHost] 刪除綁定成功!");
                }
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 刪除綁定失敗!");
            e.printStackTrace();
            return false;
        }
    }

?9.發(fā)布消息

? 發(fā)布消息其實是把消息發(fā)送給指定的 Exchange, 再根據(jù) Exchange 和 Queue 的 Binding 關系, 轉發(fā) 到對應隊列中.

? 發(fā)送消息需要指定 routingKey, 這個值的作?和 ExchangeType 是相關的.

? Direct: routingKey 就是對應隊列的名字. 此時不需要 binding 關系, 也不需要 bindingKey, 就可 以直接轉發(fā)消息.

? Fanout: routingKey 不起作?, bindingKey 也不起作?. 此時消息會轉發(fā)給綁定到該交換機上的 所有隊列中.

? Topic: routingKey 是?個特定的字符串, 會和 bindingKey 進?匹配. 如果匹配成功, 則發(fā)到對應 的隊列中. 具體規(guī)則后續(xù)介紹.

? BasicProperties 是消息的元信息. body 是消息本體.

    // 發(fā)送消息到指定的交換機/隊列中.
    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {
        try {
            // 1. 轉換交換機的名字
            exchangeName = virtualHostName + exchangeName;
            // 2. 檢查 routingKey 是否合法.
            if (!router.checkRoutingKey(routingKey)) {
                throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);
            }
            // 3. 查找交換機對象
            Exchange exchange = memoryDataCenter.getExchange(exchangeName);
            if (exchange == null) {
                throw new MqException("[VirtualHost] 交換機不存在! exchangeName=" + exchangeName);
            }
            // 4. 判定交換機的類型
            if (exchange.getType() == ExchangeType.DIRECT) {
                // 按照直接交換機的方式來轉發(fā)消息
                // 以 routingKey 作為隊列的名字, 直接把消息寫入指定的隊列中.
                // 此時, 可以無視綁定關系.
                String queueName = virtualHostName + routingKey;
                // 5. 構造消息對象
                Message message = Message.createMessageWithId(routingKey, basicProperties, body);
                // 6. 查找該隊列名對應的對象
                MSGQueue queue = memoryDataCenter.getQueue(queueName);
                if (queue == null) {
                    throw new MqException("[VirtualHost] 隊列不存在! queueName=" + queueName);
                }
                // 7. 隊列存在, 直接給隊列中寫入消息
                sendMessage(queue, message);
            } else {
                // 按照 fanout 和 topic 的方式來轉發(fā).
                // 5. 找到該交換機關聯(lián)的所有綁定, 并遍歷這些綁定對象
                ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);
                for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {
                    // 1) 獲取到綁定對象, 判定對應的隊列是否存在
                    Binding binding = entry.getValue();
                    MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
                    if (queue == null) {
                        // 此處咱們就不拋出異常了. 可能此處有多個這樣的隊列.
                        // 希望不要因為一個隊列的失敗, 影響到其他隊列的消息的傳輸.
                        System.out.println("[VirtualHost] basicPublish 發(fā)送消息時, 發(fā)現(xiàn)隊列不存在! queueName=" + binding.getQueueName());
                        continue;
                    }
                    // 2) 構造消息對象
                    Message message = Message.createMessageWithId(routingKey, basicProperties, body);
                    // 3) 判定這個消息是否能轉發(fā)給該隊列.
                    //    如果是 fanout, 所有綁定的隊列都要轉發(fā)的.
                    //    如果是 topic, 還需要判定下, bindingKey 和 routingKey 是不是匹配.
                    if (!router.route(exchange.getType(), binding, message)) {
                        continue;
                    }
                    // 4) 真正轉發(fā)消息給隊列
                    sendMessage(queue, message);
                }
            }
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] 消息發(fā)送失敗!");
            e.printStackTrace();
            return false;
        }
    }

    private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {
        // 此處發(fā)送消息, 就是把消息寫入到 硬盤 和 內(nèi)存 上.
        int deliverMode = message.getDeliverMode();
        // deliverMode 為 1 , 不持久化. deliverMode 為 2 表示持久化.
        if (deliverMode == 2) {
            diskDataCenter.sendMessage(queue, message);
        }
        // 寫入內(nèi)存
        memoryDataCenter.sendMessage(queue, message);

        // 此處還需要補充一個邏輯, 通知消費者可以消費消息了.
        consumerManager.notifyConsume(queue.getName());
    }

10.路由規(guī)則?

1) 實現(xiàn) route ?法

public class Router {
    public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException {
        // 根據(jù)不同的 exchangeType 使用不同的判定轉發(fā)規(guī)則.
        if (exchangeType == ExchangeType.FANOUT) {
            // 如果是 FANOUT 類型, 則該交換機上綁定的所有隊列都需要轉發(fā)
            return true;
        } else if (exchangeType == ExchangeType.TOPIC) {
            // 如果是 TOPIC 主題交換機, 規(guī)則就要更復雜一些.
            return routeTopic(binding, message);
        } else {
            // 其他情況是不應該存在的.
            throw new MqException("[Router] 交換機類型非法! exchangeType=" + exchangeType);
        }
    }
}

2) 實現(xiàn) checkRoutingKeyValid

    // routingKey 的構造規(guī)則:
    // 1. 數(shù)字, 字母, 下劃線
    // 2. 使用 . 分割成若干部分
    public boolean checkRoutingKey(String routingKey) {
        if (routingKey.length() == 0) {
            // 空字符串. 合法的情況. 比如在使用 fanout 交換機的時候, routingKey 用不上, 就可以設為 ""
            return true;
        }
        for (int i = 0; i < routingKey.length(); i++) {
            char ch = routingKey.charAt(i);
            // 判定該字符是否是大寫字母
            if (ch >= 'A' && ch <= 'Z') {
                continue;
            }
            // 判定該字母是否是小寫字母
            if (ch >= 'a' && ch <= 'z') {
                continue;
            }
            // 判定該字母是否是阿拉伯數(shù)字
            if (ch >= '0' && ch <= '9') {
                continue;
            }
            // 判定是否是 _ 或者 .
            if (ch == '_' || ch == '.') {
                continue;
            }
            // 該字符, 不是上述任何一種合法情況, 就直接返回 false
            return false;
        }
        // 把每個字符都檢查過, 沒有遇到非法情況. 此時直接返回 true
        return true;
    }

3) 實現(xiàn) checkBindingKeyValid?

   // bindingKey 的構造規(guī)則:
    // 1. 數(shù)字, 字母, 下劃線
    // 2. 使用 . 分割成若干部分
    // 3. 允許存在 * 和 # 作為通配符. 但是通配符只能作為獨立的分段.
    public boolean checkBindingKey(String bindingKey) {
        if (bindingKey.length() == 0) {
            // 空字符串, 也是合法情況. 比如在使用 direct / fanout 交換機的時候, bindingKey 是用不上的.
            return true;
        }
        // 檢查字符串中不能存在非法字符
        for (int i = 0; i < bindingKey.length(); i++) {
            char ch = bindingKey.charAt(i);
            if (ch >= 'A' && ch <= 'Z') {
                continue;
            }
            if (ch >= 'a' && ch <= 'z') {
                continue;
            }
            if (ch >= '0' && ch <= '9') {
                continue;
            }
            if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {
                continue;
            }
            return false;
        }
        // 檢查 * 或者 # 是否是獨立的部分.
        // aaa.*.bbb 合法情況;  aaa.a*.bbb 非法情況.
        String[] words = bindingKey.split("\\.");
        for (String word : words) {
            // 檢查 word 長度 > 1 并且包含了 * 或者 # , 就是非法的格式了.
            if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {
                return false;
            }
        }
        // 約定一下, 通配符之間的相鄰關系(人為(俺)約定的).
        // 為啥這么約定? 因為前三種相鄰的時候, 實現(xiàn)匹配的邏輯會非常繁瑣, 同時功能性提升不大~~
        // 1. aaa.#.#.bbb    => 非法
        // 2. aaa.#.*.bbb    => 非法
        // 3. aaa.*.#.bbb    => 非法
        // 4. aaa.*.*.bbb    => 合法
        for (int i = 0; i < words.length - 1; i++) {
            // 連續(xù)兩個 ##
            if (words[i].equals("#") && words[i + 1].equals("#")) {
                return false;
            }
            // # 連著 *
            if (words[i].equals("#") && words[i + 1].equals("*")) {
                return false;
            }
            // * 連著 #
            if (words[i].equals("*") && words[i + 1].equals("#")) {
                return false;
            }
        }
        return true;
    }

4) 實現(xiàn) routeTopic

    private boolean routeTopic(Binding binding, Message message) {
        // 先把這兩個 key 進行切分
        String[] bindingTokens = binding.getBindingKey().split("\\.");
        String[] routingTokens = message.getRoutingKey().split("\\.");

        // 引入兩個下標, 指向上述兩個數(shù)組. 初始情況下都為 0
        int bindingIndex = 0;
        int routingIndex = 0;
        // 此處使用 while 更合適, 每次循環(huán), 下標不一定就是 + 1, 不適合使用 for
        while (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {
            if (bindingTokens[bindingIndex].equals("*")) {
                // [情況二] 如果遇到 * , 直接進入下一輪. * 可以匹配到任意一個部分!!
                bindingIndex++;
                routingIndex++;
                continue;
            } else if (bindingTokens[bindingIndex].equals("#")) {
                // 如果遇到 #, 需要先看看有沒有下一個位置.
                bindingIndex++;
                if (bindingIndex == bindingTokens.length) {
                    // [情況三] 該 # 后面沒東西了, 說明此時一定能匹配成功了!
                    return true;
                }
                // [情況四] # 后面還有東西, 拿著這個內(nèi)容, 去 routingKey 中往后找, 找到對應的位置.
                // findNextMatch 這個方法用來查找該部分在 routingKey 的位置. 返回該下標. 沒找到, 就返回 -1
                routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);
                if (routingIndex == -1) {
                    // 沒找到匹配的結果. 匹配失敗
                    return false;
                }
                // 找到的匹配的情況, 繼續(xù)往后匹配.
                bindingIndex++;
                routingIndex++;
            } else {
                // [情況一] 如果遇到普通字符串, 要求兩邊的內(nèi)容是一樣的.
                if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {
                    return false;
                }
                bindingIndex++;
                routingIndex++;
            }
        }
        // [情況五] 判定是否是雙方同時到達末尾
        // 比如 aaa.bbb.ccc  和  aaa.bbb 是要匹配失敗的.
        if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {
            return true;
        }
        return false;
    }

?6) 測試 Router

@SpringBootTest
public class RouterTests {
    private Router router = new Router();
    private Binding binding = null;
    private Message message = null;

    @BeforeEach
    public void setUp() {
        binding = new Binding();
        message = new Message();
    }

    @AfterEach
    public void tearDown() {
        binding = null;
        message = null;
    }

    // [測試用例]
    // binding key          routing key         result
    // aaa                  aaa                 true
    // aaa.bbb              aaa.bbb             true
    // aaa.bbb              aaa.bbb.ccc         false
    // aaa.bbb              aaa.ccc             false
    // aaa.bbb.ccc          aaa.bbb.ccc         true
    // aaa.*                aaa.bbb             true
    // aaa.*.bbb            aaa.bbb.ccc         false
    // *.aaa.bbb            aaa.bbb             false
    // #                    aaa.bbb.ccc         true
    // aaa.#                aaa.bbb             true
    // aaa.#                aaa.bbb.ccc         true
    // aaa.#.ccc            aaa.ccc             true
    // aaa.#.ccc            aaa.bbb.ccc         true
    // aaa.#.ccc            aaa.aaa.bbb.ccc     true
    // #.ccc                ccc                 true
    // #.ccc                aaa.bbb.ccc         true
    @Test
    public void test1() throws MqException {
        binding.setBindingKey("aaa");
        message.setRoutingKey("aaa");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test2() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test3() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test4() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.ccc");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test5() throws MqException {
        binding.setBindingKey("aaa.bbb.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test6() throws MqException {
        binding.setBindingKey("aaa.*");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test7() throws MqException {
        binding.setBindingKey("aaa.*.bbb");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test8() throws MqException {
        binding.setBindingKey("*.aaa.bbb");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test9() throws MqException {
        binding.setBindingKey("#");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test10() throws MqException {
        binding.setBindingKey("aaa.#");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test11() throws MqException {
        binding.setBindingKey("aaa.#");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test12() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test13() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test14() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test15() throws MqException {
        binding.setBindingKey("#.ccc");
        message.setRoutingKey("ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }

    @Test
    public void test16() throws MqException {
        binding.setBindingKey("#.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
}

11.訂閱消息

1) 添加?個訂閱者

   // 訂閱消息.
    // 添加一個隊列的訂閱者, 當隊列收到消息之后, 就要把消息推送給對應的訂閱者.
    // consumerTag: 消費者的身份標識
    // autoAck: 消息被消費完成后, 應答的方式. 為 true 自動應答. 為 false 手動應答.
    // consumer: 是一個回調(diào)函數(shù). 此處類型設定成函數(shù)式接口. 這樣后續(xù)調(diào)用 basicConsume 并且傳實參的時候, 就可以寫作 lambda 樣子了.
    public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
        // 構造一個 ConsumerEnv 對象, 把這個對應的隊列找到, 再把這個 Consumer 對象添加到該隊列中.
        queueName = virtualHostName + queueName;
        try {
            consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);
            System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] basicConsume 失敗! queueName=" + queueName);
            e.printStackTrace();
            return false;
        }
    }

Consumer 相當于?個回調(diào)函數(shù). 放到 common.Consumer 中.?

@FunctionalInterface
public interface Consumer {
    // Delivery 的意思是 "投遞", 這個方法預期是在每次服務器收到消息之后, 來調(diào)用.
    // 通過這個方法把消息推送給對應的消費者.
    // (注意! 這里的方法名和參數(shù), 也都是參考 RabbitMQ 展開的)
    void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;

}

2) 創(chuàng)建訂閱者管理管理類?

創(chuàng)建 mqserver.core.ConsumerManager?

public class ConsumerManager {
    // 持有上層的 VirtualHost 對象的引用. 用來操作數(shù)據(jù).
    private VirtualHost parent;
    // 指定一個線程池, 負責去執(zhí)行具體的回調(diào)任務.
    private ExecutorService workerPool = Executors.newFixedThreadPool(4);
    // 存放令牌的隊列
    private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();
}

? parent ?來記錄虛擬主機.

? 使??個阻塞隊列?來觸發(fā)消息消費. 稱為令牌隊列. 每次有消息過來了, 都往隊列中放?個令牌(也 就是隊列名), 然后消費者再去消費對應隊列的消息.

? 使??個線程池?來執(zhí)?消息回調(diào). 這樣令牌隊列的設定避免搞出來太多線程. 否則就需要給每個隊列都安排?個單獨的線程了, 如果隊 列很多則開銷就?較?了.

?3) 添加令牌接?

// 通知消費者去消費消息
public void notifyConsume(String queueName) throws InterruptedException {
     tokenQueue.put(queueName);
}

?4) 實現(xiàn)添加訂閱者

   public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
        // 找到對應的隊列.
        MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
        if (queue == null) {
            throw new MqException("[ConsumerManager] 隊列不存在! queueName=" + queueName);
        }
        ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);
        synchronized (queue) {
            queue.addConsumerEnv(consumerEnv);
            // 如果當前隊列中已經(jīng)有了一些消息了, 需要立即就消費掉.
            int n = parent.getMemoryDataCenter().getMessageCount(queueName);
            for (int i = 0; i < n; i++) {
                // 這個方法調(diào)用一次就消費一條消息.
                consumeMessage(queue);
            }
        }
    }

創(chuàng)建 ConsumerEnv , 這個類表??個訂閱者的執(zhí)?環(huán)境.

public class ConsumerEnv {
    private String consumerTag;
    private String queueName;
    private boolean autoAck;
    // 通過這個回調(diào)來處理收到的消息.
    private Consumer consumer;

    public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
        this.consumerTag = consumerTag;
        this.queueName = queueName;
        this.autoAck = autoAck;
        this.consumer = consumer;
    }
}

5) 實現(xiàn)掃描線程?

在 ConsumerManager 中創(chuàng)建?個線程, 不停的嘗試掃描令牌隊列. 如果拿到了令牌, 就真正觸發(fā)消費消 息操作.

    public ConsumerManager(VirtualHost p) {
        parent = p;

        scannerThread = new Thread(() -> {
            while (true) {
                try {
                    // 1. 拿到令牌
                    String queueName = tokenQueue.take();
                    // 2. 根據(jù)令牌, 找到隊列
                    MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
                    if (queue == null) {
                        throw new MqException("[ConsumerManager] 取令牌后發(fā)現(xiàn), 該隊列名不存在! queueName=" + queueName);
                    }
                    // 3. 從這個隊列中消費一個消息.
                    synchronized (queue) {
                        consumeMessage(queue);
                    }
                } catch (InterruptedException | MqException e) {
                    e.printStackTrace();
                }
            }
        });
        // 把線程設為后臺線程.
        scannerThread.setDaemon(true);
        scannerThread.start();
    }

?6) 實現(xiàn)消費消息

所謂的消費消息, 其實就是調(diào)?消息的回調(diào). 并把消息刪除掉.

   private void consumeMessage(MSGQueue queue) {
        // 1. 按照輪詢的方式, 找個消費者出來.
        ConsumerEnv luckyDog = queue.chooseConsumer();
        if (luckyDog == null) {
            // 當前隊列沒有消費者, 暫時不消費. 等后面有消費者出現(xiàn)再說.
            return;
        }
        // 2. 從隊列中取出一個消息
        Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
        if (message == null) {
            // 當前隊列中還沒有消息, 也不需要消費.
            return;
        }
        // 3. 把消息帶入到消費者的回調(diào)方法中, 丟給線程池執(zhí)行.
        workerPool.submit(() -> {
            try {
                // 1. 把消息放到待確認的集合中. 這個操作勢必在執(zhí)行回調(diào)之前.
                parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);
                // 2. 真正執(zhí)行回調(diào)操作
                luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),
                        message.getBody());
                // 3. 如果當前是 "自動應答" , 就可以直接把消息刪除了.
                //    如果當前是 "手動應答" , 則先不處理, 交給后續(xù)消費者調(diào)用 basicAck 方法來處理.
                if (luckyDog.isAutoAck()) {
                    // 1) 刪除硬盤上的消息
                    if (message.getDeliverMode() == 2) {
                        parent.getDiskDataCenter().deleteMessage(queue, message);
                    }
                    // 2) 刪除上面的待確認集合中的消息
                    parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
                    // 3) 刪除內(nèi)存中消息中心里的消息
                    parent.getMemoryDataCenter().removeMessage(message.getMessageId());
                    System.out.println("[ConsumerManager] 消息被成功消費! queueName=" + queue.getName());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

?7)?結

?. 消費消息的兩種典型情況

1) 訂閱者已經(jīng)存在了, 才發(fā)送消息 這種直接獲取隊列的訂閱者, 從中按照輪詢的?式挑?個消費者來調(diào)?回調(diào)即可.

2) 消息先發(fā)送到隊列了, 訂閱者還沒到. 此時當訂閱者到達, 就快速把指定隊列中的消息全都消費掉.

?. 關于消息不丟失的論證 每個消息在從內(nèi)存隊列中出隊列時, 都會先進? 待確認 中.

? 如果 autoAck 為 true 消息被消費完畢后(執(zhí)?完消息回調(diào)之后), 再執(zhí)?清除?作. 分別清除硬盤數(shù)據(jù), 待確認隊列, 消息中?.

? 如果 autoAck 為 false 在回調(diào)內(nèi)部, 進?清除?作. 分別清除硬盤數(shù)據(jù), 待確認隊列, 消息中?.?

?1) 執(zhí)?消息回調(diào)的時候拋出異常 此時消息仍然處在待確認隊列中. 此時可以??個線程掃描待確認隊列, 如果發(fā)現(xiàn)隊列中的消息超時未確認, 則放?死信隊列. 死信隊列咱們此處暫不實現(xiàn).

2) 執(zhí)?消息回調(diào)的時候服務器宕機 內(nèi)存所有數(shù)據(jù)都沒了, 但是消息在硬盤上仍然存在. 會在服務下次啟動的時候, 加載回內(nèi)存. 重新被消費.

?12.消息確認

下列?法只是?動應答的時候才會使?. 應答成功, 則把消息刪除掉.

 public boolean basicAck(String queueName, String messageId) {
        queueName = virtualHostName + queueName;
        try {
            // 1. 獲取到消息和隊列
            Message message = memoryDataCenter.getMessage(messageId);
            if (message == null) {
                throw new MqException("[VirtualHost] 要確認的消息不存在! messageId=" + messageId);
            }
            MSGQueue queue = memoryDataCenter.getQueue(queueName);
            if (queue == null) {
                throw new MqException("[VirtualHost] 要確認的隊列不存在! queueName=" + queueName);
            }
            // 2. 刪除硬盤上的數(shù)據(jù)
            if (message.getDeliverMode() == 2) {
                diskDataCenter.deleteMessage(queue, message);
            }
            // 3. 刪除消息中心中的數(shù)據(jù)
            memoryDataCenter.removeMessage(messageId);
            // 4. 刪除待確認的集合中的數(shù)據(jù)
            memoryDataCenter.removeMessageWaitAck(queueName, messageId);
            System.out.println("[VirtualHost] basicAck 成功! 消息被成功確認! queueName=" + queueName
                    + ", messageId=" + messageId);
            return true;
        } catch (Exception e) {
            System.out.println("[VirtualHost] basicAck 失敗! 消息確認失敗! queueName=" + queueName
                    + ", messageId=" + messageId);
            e.printStackTrace();
            return false;
        }
    }

13. 測試 VirtualHost

@SpringBootTest
public class VirtualHostTests {
    private VirtualHost virtualHost = null;

    @BeforeEach
    public void setUp() {
        MqApplication.context = SpringApplication.run(MqApplication.class);
        virtualHost = new VirtualHost("default");
    }

    @AfterEach
    public void tearDown() throws IOException {
        MqApplication.context.close();
        virtualHost = null;
        // 把硬盤的目錄刪除掉
        File dataDir = new File("./data");
        FileUtils.deleteDirectory(dataDir);
    }

    @Test
    public void testExchangeDeclare() {
        boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true);
        Assertions.assertTrue(ok);
    }

    @Test
    public void testExchangeDelete() {
        boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true);
        Assertions.assertTrue(ok);

        ok = virtualHost.exchangeDelete("testExchange");
        Assertions.assertTrue(ok);
    }

    @Test
    public void testQueueDeclare() {
        boolean ok = virtualHost.queueDeclare("testQueue", true);
        Assertions.assertTrue(ok);
    }

    @Test
    public void testQueueDelete() {
        boolean ok = virtualHost.queueDeclare("testQueue", true);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueDelete("testQueue");
        Assertions.assertTrue(ok);
    }

    @Test
    public void testQueueBind() {
        boolean ok = virtualHost.queueDeclare("testQueue", true);
        Assertions.assertTrue(ok);

        ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueBind("testQueue", "testExchange", "testBindingKey");
        Assertions.assertTrue(ok);
    }

    @Test
    public void testQueueUnbind() {
        boolean ok = virtualHost.queueDeclare("testQueue", true);
        Assertions.assertTrue(ok);

        ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueBind("testQueue", "testExchange", "testBindingKey");
        Assertions.assertTrue(ok);

        ok = virtualHost.queueUnbind("testQueue", "testExchange");
        Assertions.assertTrue(ok);
    }

    @Test
    public void testBasicPublish() {
        boolean ok = virtualHost.queueDeclare("testQueue", true);
        Assertions.assertTrue(ok);

        ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true);
        Assertions.assertTrue(ok);

        ok = virtualHost.basicPublish("testExchange", "testQueue", null,
                "hello".getBytes());
        Assertions.assertTrue(ok);
    }

    // 先訂閱隊列, 后發(fā)送消息
    @Test
    public void testBasicConsume1() throws InterruptedException {
        boolean ok = virtualHost.queueDeclare("testQueue", true);
        Assertions.assertTrue(ok);
        ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true);
        Assertions.assertTrue(ok);

        // 先訂閱隊列
        ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                try {
                    // 消費者自身設定的回調(diào)方法.
                    System.out.println("messageId=" + basicProperties.getMessageId());
                    System.out.println("body=" + new String(body, 0, body.length));

                    Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
                    Assertions.assertEquals(1, basicProperties.getDeliverMode());
                    Assertions.assertArrayEquals("hello".getBytes(), body);
                } catch (Error e) {
                    // 斷言如果失敗, 拋出的是 Error, 而不是 Exception!
                    e.printStackTrace();
                    System.out.println("error");
                }
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);

        // 再發(fā)送消息
        ok = virtualHost.basicPublish("testExchange", "testQueue", null,
                "hello".getBytes());
        Assertions.assertTrue(ok);
    }

    // 先發(fā)送消息, 后訂閱隊列.
    @Test
    public void testBasicConsume2() throws InterruptedException {
        boolean ok = virtualHost.queueDeclare("testQueue", true);
        Assertions.assertTrue(ok);
        ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true);
        Assertions.assertTrue(ok);

        // 先發(fā)送消息
        ok = virtualHost.basicPublish("testExchange", "testQueue", null,
                "hello".getBytes());
        Assertions.assertTrue(ok);

        // 再訂閱隊列
        ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                // 消費者自身設定的回調(diào)方法.
                System.out.println("messageId=" + basicProperties.getMessageId());
                System.out.println("body=" + new String(body, 0, body.length));

                Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
                Assertions.assertEquals(1, basicProperties.getDeliverMode());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);
    }

    @Test
    public void testBasicConsumeFanout() throws InterruptedException {
        boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueDeclare("testQueue1", false);
        Assertions.assertTrue(ok);
        ok = virtualHost.queueBind("testQueue1", "testExchange", "");
        Assertions.assertTrue(ok);

        ok = virtualHost.queueDeclare("testQueue2", false);
        Assertions.assertTrue(ok);
        ok = virtualHost.queueBind("testQueue2", "testExchange", "");
        Assertions.assertTrue(ok);

        // 往交換機中發(fā)布一個消息
        ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());
        Assertions.assertTrue(ok);

        Thread.sleep(500);

        // 兩個消費者訂閱上述的兩個隊列.
        ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("messageId=" + basicProperties.getMessageId());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("messageId=" + basicProperties.getMessageId());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);
    }

    @Test
    public void testBasicConsumeTopic() throws InterruptedException {
        boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueDeclare("testQueue", false);
        Assertions.assertTrue(ok);

        ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");
        Assertions.assertTrue(ok);

        ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());
        Assertions.assertTrue(ok);

        ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("messageId=" + basicProperties.getMessageId());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);
    }

    @Test
    public void testBasicAck() throws InterruptedException {
        boolean ok = virtualHost.queueDeclare("testQueue", true);
        Assertions.assertTrue(ok);
        ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
                true);
        Assertions.assertTrue(ok);

        // 先發(fā)送消息
        ok = virtualHost.basicPublish("testExchange", "testQueue", null,
                "hello".getBytes());
        Assertions.assertTrue(ok);

        // 再訂閱隊列 [要改的地方, 把 autoAck 改成 false]
        ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
                // 消費者自身設定的回調(diào)方法.
                System.out.println("messageId=" + basicProperties.getMessageId());
                System.out.println("body=" + new String(body, 0, body.length));

                Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
                Assertions.assertEquals(1, basicProperties.getDeliverMode());
                Assertions.assertArrayEquals("hello".getBytes(), body);

                // [要改的地方, 新增手動調(diào)用 basicAck]
                boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());
                Assertions.assertTrue(ok);
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);
    }
}

??. ?絡通信協(xié)議設計

1.明確需求

?產(chǎn)者和消費者都是客?端, 都需要通過?絡和 Broker Server 進?通信. 此處我們使? TCP 協(xié)議, 來作為通信的底層協(xié)議. 同時在這個基礎上?定義應?層協(xié)議, 完成客?端對服 務器這邊功能的遠程調(diào)?. 要調(diào)?的功能有:

? 創(chuàng)建 channel

? 關閉 channel

? 創(chuàng)建 exchange

? 刪除 exchange

? 創(chuàng)建 queue

? 刪除 queue

? 創(chuàng)建 binding

? 刪除 binding

? 發(fā)送 message

? 訂閱 message

? 發(fā)送 ack

? 返回 message (服務器 -> 客?端)

2.設計應?層協(xié)議?

使??進制的?式設定協(xié)議.

1)請求《消息隊列MyMQ》——參考RabbitMQ實現(xiàn),rabbitmq,分布式

2)響應?《消息隊列MyMQ》——參考RabbitMQ實現(xiàn),rabbitmq,分布式

其中 type 表?請求響應不同的功能. 取值如下:?

? 0x1 創(chuàng)建 channel

? 0x2 關閉 channel

? 0x3 創(chuàng)建 exchange

? 0x4 銷毀 exchange

? 0x5 創(chuàng)建 queue

? 0x6 銷毀 queue

? 0x7 創(chuàng)建 binding

? 0x8 銷毀 binding

? 0x9 發(fā)送 message

? 0xa 訂閱 message

? 0xb 返回 ack

? 0xc 服務器給客?端推送的消息. (被訂閱的消息) 響應獨有的.

其中 payload 部分, 會根據(jù)不同的 type, 存在不同的格式.

對于請求來說, payload 表?這次?法調(diào)?的各種參數(shù)信息.

對于響應來說, payload 表?這次?法調(diào)?的返回值.

3.定義 Request / Response

public class Request {
    private int type;
    private int length;
    private byte[] payload;
    //省略 getter setter
}
public class Response {
    private int type;
    private int length;
    private byte[] payload;
    //省略 getter setter
}

4.定義參數(shù)?類

構造?個類表??法的參數(shù), 作為 Request 的 payload. 不同的?法中, 參數(shù)形態(tài)各異, 但是有些信息是通?的, 使??個?類表?出來. 具體每個?法的參數(shù)再 通過繼承的?式體現(xiàn).

public class BaseArguments implements Serializable {
    // 表??次請求/響應的唯? id. ?來把響應和請求對上.
    protected String rid;
    protected String channelId;
    // 省略 getter setter
}

5.定義返回值?類?

public class BaseReturns implements Serializable {
        // 表??次請求/響應的唯? id. ?來把響應和請求對上.
        protected String rid;
        protected String channelId;
        protected boolean ok;
        // 省略 getter setter
}

?6.定義其他參數(shù)類

? 1) ExchangeDeclareArguments

public class ExchangeDeclareArguments extends BasicArguments implements Serializable {
    private String exchangeName;
    private ExchangeType exchangeType;
    private boolean durable;
}

??個創(chuàng)建交換機的請求, 形如:

? 可以把 ExchangeDeclareArguments 轉成 byte[], 就得到了下列圖?的結構. ? 按照 length ?度讀取出 payload, 就可以把讀到的?進制數(shù)據(jù)轉換成 ExchangeDeclareArguments 對象

?《消息隊列MyMQ》——參考RabbitMQ實現(xiàn),rabbitmq,分布式

2) ExchangeDeleteArguments?

public class ExchangeDeleteArguments extends BasicArguments implements Serializable {
    private String exchangeName;
}

?3) QueueDeclareArguments

public class QueueDeclareArguments extends BasicArguments implements Serializable {
    private String queueName;
    private boolean durable;
    private boolean exclusive;
    private boolean autoDelete;
    private Map<String, Object> arguments;
}

4) QueueDeleteArguments?

public class QueueDeleteArguments extends BasicArguments implements Serializable {
    private String queueName;
}

5) QueueBindArguments?

public class QueueBindArguments extends BasicArguments implements Serializable {
    private String queueName;
    private String exchangeName;
    private String bindingKey;
}

6) QueueUnbindArguments?

public class QueueUnbindArguments extends BasicArguments implements Serializable {
    private String queueName;
    private String exchangeName;
}

7) BasicPublishArguments?

public class BasicPublishArguments extends BasicArguments implements Serializable {
    private String exchangeName;
    private String routingKey;
    private BasicProperties basicProperties;
    private byte[] body;
}

?8) BasicConsumeArguments

public class BasicConsumeArguments extends BasicArguments implements Serializable {
    private String consumerTag;
    private String queueName;
    private boolean autoAck;
}

9) SubScribeReturns

public class SubScribeReturns extends BasicReturns implements Serializable {
    private String consumerTag;
    private BasicProperties basicProperties;
    private byte[] body;
}

?一. 實現(xiàn) BrokerServer

1.創(chuàng)建 BrokerServer 類

public class BrokerServer {
    private ServerSocket serverSocket = null;

    // 當前考慮一個 BrokerServer 上只有一個 虛擬主機
    private VirtualHost virtualHost = new VirtualHost("default");
    // 使用這個 哈希表 表示當前的所有會話(也就是說有哪些客戶端正在和咱們的服務器進行通信)
    // 此處的 key 是 channelId, value 為對應的 Socket 對象
    private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<String, Socket>();
    // 引入一個線程池, 來處理多個客戶端的請求.
    private ExecutorService executorService = null;
    // 引入一個 boolean 變量控制服務器是否繼續(xù)運行
    private volatile boolean runnable = true;
}

? virtualHost 表?服務器持有的虛擬主機. 隊列, 交換機, 綁定, 消息都是通過虛擬主機管理.

? sessions ?來管理所有的客?端的連接. 記錄每個客?端的 socket.

? serverSocket 是服務器??的 socket

? executorService 這個線程池?來處理響應.

? runnable 這個標志位?來控制服務器的運?停?.

?2.啟動/停?服務器

   public void start() throws IOException {
        System.out.println("[BrokerServer] 啟動!");
        executorService = Executors.newCachedThreadPool();
        try {
            while (runnable) {
                Socket clientSocket = serverSocket.accept();
                // 把處理連接的邏輯丟給這個線程池.
                executorService.submit(() -> {
                    processConnection(clientSocket);
                });
            }
        } catch (SocketException e) {
            System.out.println("[BrokerServer] 服務器停止運行!");
            // e.printStackTrace();
        }
    }

    // 一般來說停止服務器, 就是直接 kill 掉對應進程就行了.
    // 此處還是搞一個單獨的停止方法. 主要是用于后續(xù)的單元測試.
    public void stop() throws IOException {
        runnable = false;
        // 把線程池中的任務都放棄了. 讓線程都銷毀.
        executorService.shutdownNow();
        serverSocket.close();
    }

?3.實現(xiàn)處理連接

?? 對于 EOFException 和 SocketException , 我們視為客?端正常斷開連接.

? 如果是客?端先 close, 后調(diào)? DataInputStream 的 read, 則拋出 EOFException

? 如果是先調(diào)? DataInputStream 的 read, 后客?端調(diào)? close, 則拋出 SocketException

    // 通過這個方法, 來處理一個客戶端的連接.
    // 在這一個連接中, 可能會涉及到多個請求和響應.
    private void processConnection(Socket clientSocket) {
        try (InputStream inputStream = clientSocket.getInputStream();
             OutputStream outputStream = clientSocket.getOutputStream()) {
            // 這里需要按照特定格式來讀取并解析. 此時就需要用到 DataInputStream 和 DataOutputStream
            try (DataInputStream dataInputStream = new DataInputStream(inputStream);
                 DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                while (true) {
                    // 1. 讀取請求并解析.
                    Request request = readRequest(dataInputStream);
                    // 2. 根據(jù)請求計算響應
                    Response response = process(request, clientSocket);
                    // 3. 把響應寫回給客戶端
                    writeResponse(dataOutputStream, response);
                }
            }
        } catch (EOFException | SocketException e) {
            // 對于這個代碼, DataInputStream 如果讀到 EOF , 就會拋出一個 EOFException 異常.
            // 需要借助這個異常來結束循環(huán)
            System.out.println("[BrokerServer] connection 關閉! 客戶端的地址: " + clientSocket.getInetAddress().toString()
                    + ":" + clientSocket.getPort());
        } catch (IOException | ClassNotFoundException | MqException e) {
            System.out.println("[BrokerServer] connection 出現(xiàn)異常!");
            e.printStackTrace();
        } finally {
            try {
                // 當連接處理完了, 就需要記得關閉 socket
                clientSocket.close();
                // 一個 TCP 連接中, 可能包含多個 channel. 需要把當前這個 socket 對應的所有 channel 也順便清理掉.
                clearClosedSession(clientSocket);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

?4.實現(xiàn) readRequest

   private Request readRequest(DataInputStream dataInputStream) throws IOException {
        Request request = new Request();
        request.setType(dataInputStream.readInt());
        request.setLength(dataInputStream.readInt());
        byte[] payload = new byte[request.getLength()];
        int n = dataInputStream.read(payload);
        if (n != request.getLength()) {
            throw new IOException("讀取請求格式出錯!");
        }
        request.setPayload(payload);
        return request;
    }

5.實現(xiàn) writeResponse?

    private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
        dataOutputStream.writeInt(response.getType());
        dataOutputStream.writeInt(response.getLength());
        dataOutputStream.write(response.getPayload());
        // 這個刷新緩沖區(qū)也是重要的操作!!
        dataOutputStream.flush();
    }

6.實現(xiàn)處理請求

? 先把請求轉換成 BaseArguments , 獲取到其中的 channelId 和 rid

? 再根據(jù)不同的 type, 分別處理不同的邏輯. (主要是調(diào)? virtualHost 中不同的?法).

? 針對消息訂閱操作, 則需要在存在消息的時候通過回調(diào), 把響應結果寫回給對應的客?端.

? 最后構造成統(tǒng)?的響應.

    private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
        // 1. 把 request 中的 payload 做一個初步的解析.
        BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
        System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()
                + ", type=" + request.getType() + ", length=" + request.getLength());
        // 2. 根據(jù) type 的值, 來進一步區(qū)分接下來這次請求要干啥.
        boolean ok = true;
        if (request.getType() == 0x1) {
            // 創(chuàng)建 channel
            sessions.put(basicArguments.getChannelId(), clientSocket);
            System.out.println("[BrokerServer] 創(chuàng)建 channel 完成! channelId=" + basicArguments.getChannelId());
        } else if (request.getType() == 0x2) {
            // 銷毀 channel
            sessions.remove(basicArguments.getChannelId());
            System.out.println("[BrokerServer] 銷毀 channel 完成! channelId=" + basicArguments.getChannelId());
        } else if (request.getType() == 0x3) {
            // 創(chuàng)建交換機. 此時 payload 就是 ExchangeDeclareArguments 對象了.
            ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
            ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),
                    arguments.isDurable());
        } else if (request.getType() == 0x4) {
            ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
            ok = virtualHost.exchangeDelete(arguments.getExchangeName());
        } else if (request.getType() == 0x5) {
            QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
            ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable());
        } else if (request.getType() == 0x6) {
            QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
            ok = virtualHost.queueDelete((arguments.getQueueName()));
        } else if (request.getType() == 0x7) {
            QueueBindArguments arguments = (QueueBindArguments) basicArguments;
            ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());
        } else if (request.getType() == 0x8) {
            QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;
            ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());
        } else if (request.getType() == 0x9) {
            BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
            ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),
                    arguments.getBasicProperties(), arguments.getBody());
        } else if (request.getType() == 0xa) {
            BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
            ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),
                    new Consumer() {
                        // 這個回調(diào)函數(shù)要做的工作, 就是把服務器收到的消息可以直接推送回對應的消費者客戶端
                        @Override
                        public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                            // 先知道當前這個收到的消息, 要發(fā)給哪個客戶端.
                            // 此處 consumerTag 其實是 channelId. 根據(jù) channelId 去 sessions 中查詢, 就可以得到對應的
                            // socket 對象了, 從而可以往里面發(fā)送數(shù)據(jù)了
                            // 1. 根據(jù) channelId 找到 socket 對象
                            Socket clientSocket = sessions.get(consumerTag);
                            if (clientSocket == null || clientSocket.isClosed()) {
                                throw new MqException("[BrokerServer] 訂閱消息的客戶端已經(jīng)關閉!");
                            }
                            // 2. 構造響應數(shù)據(jù)
                            SubScribeReturns subScribeReturns = new SubScribeReturns();
                            subScribeReturns.setChannelId(consumerTag);
                            subScribeReturns.setRid(""); // 由于這里只有響應, 沒有請求, 不需要去對應. rid 暫時不需要.
                            subScribeReturns.setOk(true);
                            subScribeReturns.setConsumerTag(consumerTag);
                            subScribeReturns.setBasicProperties(basicProperties);
                            subScribeReturns.setBody(body);
                            byte[] payload = BinaryTool.toBytes(subScribeReturns);
                            Response response = new Response();
                            // 0xc 表示服務器給消費者客戶端推送的消息數(shù)據(jù).
                            response.setType(0xc);
                            // response 的 payload 就是一個 SubScribeReturns
                            response.setLength(payload.length);
                            response.setPayload(payload);
                            // 3. 把數(shù)據(jù)寫回給客戶端.
                            //    注意! 此處的 dataOutputStream 這個對象不能 close !!!
                            //    如果 把 dataOutputStream 關閉, 就會直接把 clientSocket 里的 outputStream 也關了.
                            //    此時就無法繼續(xù)往 socket 中寫入后續(xù)數(shù)據(jù)了.
                            DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
                            writeResponse(dataOutputStream, response);
                        }
                    });
        } else if (request.getType() == 0xb) {
            // 調(diào)用 basicAck 確認消息.
            BasicAckArguments arguments = (BasicAckArguments) basicArguments;
            ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());
        } else {
            // 當前的 type 是非法的.
            throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());
        }
        // 3. 構造響應
        BasicReturns basicReturns = new BasicReturns();
        basicReturns.setChannelId(basicArguments.getChannelId());
        basicReturns.setRid(basicArguments.getRid());
        basicReturns.setOk(ok);
        byte[] payload = BinaryTool.toBytes(basicReturns);
        Response response = new Response();
        response.setType(request.getType());
        response.setLength(payload.length);
        response.setPayload(payload);
        System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()
                + ", type=" + response.getType() + ", length=" + response.getLength());
        return response;
    }

?7.實現(xiàn) clearClosedSession

? 如果客?端只關閉了 Connection, 沒關閉 Connection 中包含的 Channel, 也沒關系, 在這?統(tǒng)?進 ?清理.

? 注意迭代器失效問題

    private void clearClosedSession(Socket clientSocket) {
        // 這里要做的事情, 主要就是遍歷上述 sessions hash 表, 把該被關閉的 socket 對應的鍵值對, 統(tǒng)統(tǒng)刪掉.
        List<String> toDeleteChannelId = new ArrayList<>();
        for (Map.Entry<String, Socket> entry : sessions.entrySet()) {
            if (entry.getValue() == clientSocket) {
                // 不能在這里直接刪除!!!
                // 這屬于使用集合類的一個大忌!!! 一邊遍歷, 一邊刪除!!!
                // sessions.remove(entry.getKey());
                toDeleteChannelId.add(entry.getKey());
            }
        }
        for (String channelId : toDeleteChannelId) {
            sessions.remove(channelId);
        }
        System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);
    }

??二. 實現(xiàn)客?端

1.創(chuàng)建 ConnectionFactory

public class ConnectionFactory {
    // broker server 的 ip 地址
    private String host;
    // broker server 的端口號
    private int port;

    // 訪問 broker server 的哪個虛擬主機.
    // 下列幾個屬性暫時先都不搞了.
//    private String virtualHostName;
//    private String username;
//    private String password;

    public Connection newConnection() throws IOException {
        Connection connection = new Connection(host, port);
        return connection;
    }
}

?2.Connection 和 Channel 的定義

?個客?端可以創(chuàng)建多個 Connection.

?個 Connection 對應?個 socket, ?個 TCP 連接.

?個 Connection 可以包含多個 Channel

1) Connection 的定義?

public class Connection {
    private Socket socket = null;
    // 需要管理多個 channel. 使用一個 hash 表把若干個 channel 組織起來.
    private ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();

    private InputStream inputStream;
    private OutputStream outputStream;
    private DataInputStream dataInputStream;
    private DataOutputStream dataOutputStream;
}

? Socket 是客?端持有的套接字. InputStream OutputStream DataInputStream DataOutputStream 均為 socket 通信的接?.

? channelMap ?來管理該連接中所有的 Channel.

? callbackPool 是?來在客?端這邊執(zhí)???回調(diào)的線程池.

?2) Channel 的定義

public class Channel {
    private String channelId;
    // 當前這個 channel 屬于哪個連接.
    private Connection connection;
    // 用來存儲后續(xù)客戶端收到的服務器的響應.
    private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
    // 如果當前 Channel 訂閱了某個隊列, 就需要在此處記錄下對應回調(diào)是啥. 當該隊列的消息返回回來的時候, 調(diào)用回調(diào).
    // 此處約定一個 Channel 中只能有一個回調(diào).
    private Consumer consumer = null;

    public Channel(String channelId, Connection connection) {
        this.channelId = channelId;
        this.connection = connection;
    }
}

? channelId 為 channel 的?份標識, 使? UUID 標識.

? Connection 為 channel 對應的連接.

? baseReturnsMap ?來保存響應的返回值. 放到這個哈希表中?便和請求匹配.

? consumer 為消費者的回調(diào)(??注冊的). 對于消息響應, 應該調(diào)?這個回調(diào)處理消息.?

?3.封裝請求響應讀寫操作

   public void writeRequest(Request request) throws IOException {
        dataOutputStream.writeInt(request.getType());
        dataOutputStream.writeInt(request.getLength());
        dataOutputStream.write(request.getPayload());
        dataOutputStream.flush();
        System.out.println("[Connection] 發(fā)送請求! type=" + request.getType() + ", length=" + request.getLength());
    }

    // 讀取響應
    public Response readResponse() throws IOException {
        Response response = new Response();
        response.setType(dataInputStream.readInt());
        response.setLength(dataInputStream.readInt());
        byte[] payload = new byte[response.getLength()];
        int n = dataInputStream.read(payload);
        if (n != response.getLength()) {
            throw new IOException("讀取的響應數(shù)據(jù)不完整!");
        }
        response.setPayload(payload);
        System.out.println("[Connection] 收到響應! type=" + response.getType() + ", length=" + response.getLength());
        return response;
    }

4.創(chuàng)建 channel??

    public Channel createChannel() throws IOException {
        String channelId = "C-" + UUID.randomUUID().toString();
        Channel channel = new Channel(channelId, this);
        // 把這個 channel 對象放到 Connection 管理 channel 的 哈希表 中.
        channelMap.put(channelId, channel);
        // 同時也需要把 "創(chuàng)建 channel" 的這個消息也告訴服務器.
        boolean ok = channel.createChannel();
        if (!ok) {
            // 服務器這里創(chuàng)建失敗了!! 整個這次創(chuàng)建 channel 操作不順利!!
            // 把剛才已經(jīng)加入 hash 表的鍵值對, 再刪了.
            channelMap.remove(channelId);
            return null;
        }
        return channel;
    }

5.發(fā)送請求

1) 創(chuàng)建 channel

   public boolean createChannel() throws IOException {
        // 對于創(chuàng)建 Channel 操作來說, payload 就是一個 basicArguments 對象
        BasicArguments basicArguments = new BasicArguments();
        basicArguments.setChannelId(channelId);
        basicArguments.setRid(generateRid());
        byte[] payload = BinaryTool.toBytes(basicArguments);

        Request request = new Request();
        request.setType(0x1);
        request.setLength(payload.length);
        request.setPayload(payload);

        // 構造出完整請求之后, 就可以發(fā)送這個請求了.
        connection.writeRequest(request);
        // 等待服務器的響應
        BasicReturns basicReturns = waitResult(basicArguments.getRid());
        return basicReturns.isOk();

      private String generateRid() {
        return "R-" + UUID.randomUUID().toString();
    }

 private BasicReturns waitResult(String rid) {
        BasicReturns basicReturns = null;
        while ((basicReturns = basicReturnsMap.get(rid)) == null) {
            // 如果查詢結果為 null, 說明包裹還沒回來.
            // 此時就需要阻塞等待.
            synchronized (this) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        // 讀取成功之后, 還需要把這個消息從哈希表中刪除掉.
        basicReturnsMap.remove(rid);
        return basicReturns;
    }

    }

?2) 關閉 channel

   // 關閉 channel, 給服務器發(fā)送一個 type = 0x2 的請求
    public boolean close() throws IOException {
        BasicArguments basicArguments = new BasicArguments();
        basicArguments.setRid(generateRid());
        basicArguments.setChannelId(channelId);
        byte[] payload = BinaryTool.toBytes(basicArguments);

        Request request = new Request();
        request.setType(0x2);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicArguments.getRid());
        return basicReturns.isOk();
    }

3) 創(chuàng)建交換機

    // 創(chuàng)建交換機
    public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable) throws IOException {
        ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
        exchangeDeclareArguments.setRid(generateRid());
        exchangeDeclareArguments.setChannelId(channelId);
        exchangeDeclareArguments.setExchangeName(exchangeName);
        exchangeDeclareArguments.setExchangeType(exchangeType);
        exchangeDeclareArguments.setDurable(durable);
        byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);

        Request request = new Request();
        request.setType(0x3);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
        return basicReturns.isOk();
    }

?4) 刪除交換機

    // 刪除交換機
    public boolean exchangeDelete(String exchangeName) throws IOException {
        ExchangeDeleteArguments arguments = new ExchangeDeleteArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setExchangeName(exchangeName);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x4);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

5) 創(chuàng)建隊列

    // 創(chuàng)建隊列
    public boolean queueDeclare(String queueName, boolean durable) throws IOException {
        QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
        queueDeclareArguments.setRid(generateRid());
        queueDeclareArguments.setChannelId(channelId);
        queueDeclareArguments.setQueueName(queueName);
        queueDeclareArguments.setDurable(durable);


        byte[] payload = BinaryTool.toBytes(queueDeclareArguments);

        Request request = new Request();
        request.setType(0x5);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
        return basicReturns.isOk();
    }

6) 刪除隊列

    public boolean queueDelete(String queueName) throws IOException {
        QueueDeleteArguments arguments = new QueueDeleteArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setQueueName(queueName);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x6);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

?7) 創(chuàng)建綁定

    public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {
        QueueBindArguments arguments = new QueueBindArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setQueueName(queueName);
        arguments.setExchangeName(exchangeName);
        arguments.setBindingKey(bindingKey);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x7);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

8) 刪除綁定

    // 解除綁定
    public boolean queueUnbind(String queueName, String exchangeName) throws IOException {
        QueueUnbindArguments arguments = new QueueUnbindArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setQueueName(queueName);
        arguments.setExchangeName(exchangeName);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x8);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

?9) 發(fā)送消息

    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {
        BasicPublishArguments arguments = new BasicPublishArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setExchangeName(exchangeName);
        arguments.setRoutingKey(routingKey);
        arguments.setBasicProperties(basicProperties);
        arguments.setBody(body);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x9);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

10) 訂閱消息

    // 訂閱消息
    public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
        // 先設置回調(diào).
        if (this.consumer != null) {
            throw new MqException("該 channel 已經(jīng)設置過消費消息的回調(diào)了, 不能重復設置!");
        }
        this.consumer = consumer;

        BasicConsumeArguments arguments = new BasicConsumeArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setConsumerTag(channelId);  // 此處 consumerTag 也使用 channelId 來表示了.
        arguments.setQueueName(queueName);
        arguments.setAutoAck(autoAck);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0xa);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

?11) 確認消息

    // 確認消息
    public boolean basicAck(String queueName, String messageId) throws IOException {
        BasicAckArguments arguments = new BasicAckArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setQueueName(queueName);
        arguments.setMessageId(messageId);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0xb);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

6.處理響應

1) 創(chuàng)建掃描線程

創(chuàng)建?個掃描線程, ?來不停的讀取 socket 中的響應數(shù)據(jù).

  public Connection(String host, int port) throws IOException {
        socket = new Socket(host, port);
        inputStream = socket.getInputStream();
        outputStream = socket.getOutputStream();
        dataInputStream = new DataInputStream(inputStream);
        dataOutputStream = new DataOutputStream(outputStream);

        callbackPool = Executors.newFixedThreadPool(4);

        // 創(chuàng)建一個掃描線程, 由這個線程負責不停的從 socket 中讀取響應數(shù)據(jù). 把這個響應數(shù)據(jù)再交給對應的 channel 負責處理.
        Thread t = new Thread(() -> {
            try {
                while (!socket.isClosed()) {
                    Response response = readResponse();
                    dispatchResponse(response);
                }
            } catch (SocketException e) {
                // 連接正常斷開的. 此時這個異常直接忽略.
                System.out.println("[Connection] 連接正常斷開!");
            } catch (IOException | ClassNotFoundException | MqException e) {
                System.out.println("[Connection] 連接異常斷開!");
                e.printStackTrace();
            }
        });
        t.start();
    }

2) 實現(xiàn)響應的分發(fā)

給 Connection 創(chuàng)建 dispatchResponse ?法.

? 針對服務器返回的控制響應和消息響應, 分別處理.

? 如果是訂閱數(shù)據(jù), 則調(diào)? channel 中的回調(diào).

? 如果是控制消息, 直接放到結果集合中.

    // 使用這個方法來分別處理, 當前的響應是一個針對控制請求的響應, 還是服務器推送的消息.
    private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
        if (response.getType() == 0xc) {
            // 服務器推送來的消息數(shù)據(jù)
            SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
            // 根據(jù) channelId 找到對應的 channel 對象
            Channel channel = channelMap.get(subScribeReturns.getChannelId());
            if (channel == null) {
                throw new MqException("[Connection] 該消息對應的 channel 在客戶端中不存在! channelId=" + channel.getChannelId());
            }
            // 執(zhí)行該 channel 對象內(nèi)部的回調(diào).
            callbackPool.submit(() -> {
                try {
                    channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),
                            subScribeReturns.getBody());
                } catch (MqException | IOException e) {
                    e.printStackTrace();
                }
            });
        } else {
            // 當前響應是針對剛才的控制請求的響應
            BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
            // 把這個結果放到對應的 channel 的 hash 表中.
            Channel channel = channelMap.get(basicReturns.getChannelId());
            if (channel == null) {
                throw new MqException("[Connection] 該消息對應的 channel 在客戶端中不存在! channelId=" + channel.getChannelId());
            }
            channel.putReturns(basicReturns);
        }
    }

3) 實現(xiàn) channel.putReturns?

?把響應放到響應的 hash 表中, 同時喚醒等待響應的線程去消費.

    public void putReturns(BasicReturns basicReturns) {
        basicReturnsMap.put(basicReturns.getRid(), basicReturns);
        synchronized (this) {
            // 當前也不知道有多少個線程在等待上述的這個響應.
            // 把所有的等待的線程都喚醒.
            notifyAll();
        }
    }

?7.關閉 Connection

   public void close() {
        // 關閉 Connection 釋放上述資源
        try {
            callbackPool.shutdownNow();
            channelMap.clear();
            inputStream.close();
            outputStream.close();
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

?8.測試客?端-服務器

public class MqClientTests {
    private BrokerServer brokerServer = null;
    private ConnectionFactory factory = null;
    private Thread t = null;

    @BeforeEach
    public void setUp() throws IOException {
        // 1. 先啟動服務器
        MqApplication.context = SpringApplication.run(MqApplication.class);
        brokerServer = new BrokerServer(9090);
        t = new Thread(() -> {
            // 這個 start 方法會進入一個死循環(huán). 使用一個新的線程來運行 start 即可!
            try {
                brokerServer.start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        t.start();

        // 2. 配置 ConnectionFactory
        factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);
    }

    @AfterEach
    public void tearDown() throws IOException {
        // 停止服務器
        brokerServer.stop();
        // t.join();
        MqApplication.context.close();

        // 刪除必要的文件
        File file = new File("./data");
        FileUtils.deleteDirectory(file);

        factory = null;
    }

    @Test
    public void testConnection() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
    }

    @Test
    public void testChannel() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);
    }

    @Test
    public void testExchange() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);
        Assertions.assertTrue(ok);

        ok = channel.exchangeDelete("testExchange");
        Assertions.assertTrue(ok);

        // 此處穩(wěn)妥起見, 把改關閉的要進行關閉.
        channel.close();
        connection.close();
    }

    @Test
    public void testQueue() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.queueDeclare("testQueue", true);
        Assertions.assertTrue(ok);

        ok = channel.queueDelete("testQueue");
        Assertions.assertTrue(ok);

        channel.close();
        connection.close();
    }

    @Test
    public void testBinding() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);
        Assertions.assertTrue(ok);
        ok = channel.queueDeclare("testQueue", true);
        Assertions.assertTrue(ok);

        ok = channel.queueBind("testQueue", "testExchange", "testBindingKey");
        Assertions.assertTrue(ok);

        ok = channel.queueUnbind("testQueue", "testExchange");
        Assertions.assertTrue(ok);

        channel.close();
        connection.close();
    }

    @Test
    public void testMessage() throws IOException, MqException, InterruptedException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);
        Assertions.assertTrue(ok);
        ok = channel.queueDeclare("testQueue", true);
        Assertions.assertTrue(ok);

        byte[] requestBody = "hello".getBytes();
        ok = channel.basicPublish("testExchange", "testQueue", null, requestBody);
        Assertions.assertTrue(ok);

        ok = channel.basicConsume("testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("[消費數(shù)據(jù)] 開始!");
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("basicProperties=" + basicProperties);
                Assertions.assertArrayEquals(requestBody, body);
                System.out.println("[消費數(shù)據(jù)] 結束!");
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);

        channel.close();
        connection.close();
    }
}

?三. 案例: 基于 MQ 的?產(chǎn)者消費者模型

1.生產(chǎn)者

/*
 * 這個類用來表示一個生產(chǎn)者.
 * 通常這是一個單獨的服務器程序.
 */
public class DemoProducer {
    public static void main(String[] args) throws IOException, InterruptedException {
        System.out.println("啟動生產(chǎn)者");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 創(chuàng)建交換機和隊列
        channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);
        channel.queueDeclare("testQueue", true);

        // 創(chuàng)建一個消息并發(fā)送
        byte[] body = "hello".getBytes();
        boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);
        System.out.println("消息投遞完成! ok=" + ok);

        Thread.sleep(500);
        channel.close();
        connection.close();
    }
}

?2.消費者

/*
 * 這個類表示一個消費者.
 * 通常這個類也應該是在一個獨立的服務器中被執(zhí)行
 */
public class DemoConsumer {
    public static void main(String[] args) throws IOException, MqException, InterruptedException {
        System.out.println("啟動消費者!");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);
        channel.queueDeclare("testQueue", true);

        channel.basicConsume("testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("[消費數(shù)據(jù)] 開始!");
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("basicProperties=" + basicProperties);
                String bodyString = new String(body, 0, body.length);
                System.out.println("body=" + bodyString);
                System.out.println("[消費數(shù)據(jù)] 結束!");
            }
        });

        // 由于消費者也不知道生產(chǎn)者要生產(chǎn)多少, 就在這里通過這個循環(huán)模擬一直等待消費.
        while (true) {
            Thread.sleep(500);
        }
    }
}

3.運行結果

《消息隊列MyMQ》——參考RabbitMQ實現(xiàn),rabbitmq,分布式《消息隊列MyMQ》——參考RabbitMQ實現(xiàn),rabbitmq,分布式《消息隊列MyMQ》——參考RabbitMQ實現(xiàn),rabbitmq,分布式《消息隊列MyMQ》——參考RabbitMQ實現(xiàn),rabbitmq,分布式

十四.總結

本文所有代碼已上傳?wuchangsheng1/mq (github.com)文章來源地址http://www.zghlxwxcb.cn/news/detail-765036.html

到了這里,關于《消息隊列MyMQ》——參考RabbitMQ實現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 分布式搜索引擎(Elastic Search)+消息隊列(RabbitMQ)部署(商城4)

    分布式搜索引擎(Elastic Search)+消息隊列(RabbitMQ)部署(商城4)

    1、全文搜索 Elastic search可以用于實現(xiàn)全文搜索功能,例如商城中對商品搜索、搜索、分類搜索、訂單搜索、客戶搜索等。它支持復雜的查詢語句、中文分詞、近似搜索等功能,可以快速地搜索并返回匹配的結果。 2、日志分析 Elastic search可以用于實現(xiàn)實時日志分析,例

    2024年02月04日
    瀏覽(21)
  • RabbitMQ實現(xiàn)延遲消息,RabbitMQ使用死信隊列實現(xiàn)延遲消息,RabbitMQ延時隊列插件

    RabbitMQ實現(xiàn)延遲消息,RabbitMQ使用死信隊列實現(xiàn)延遲消息,RabbitMQ延時隊列插件

    假設有一個業(yè)務場景:超過30分鐘未付款的訂單自動關閉,這個功能應該怎么實現(xiàn)? RabbitMQ使用死信隊列,可以實現(xiàn)消息的延遲接收。 隊列有一個消息過期屬性。就像豐巢超過24小時就收費一樣,通過設置這個屬性,超過了指定事件的消息將會被丟棄。 這個屬性交:x-message

    2024年02月13日
    瀏覽(104)
  • 消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot

    消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot

    1、延遲隊列概念 延時隊列內(nèi)部是有序的 , 最重要的特性 就體現(xiàn)在它的 延時屬性 上,延時隊列中的元素是希望在指定時間到了以后或之前取出和處理,簡單來說, 延時隊列就是用來存放需要在指定時間被處理的元素的隊列。 延遲隊列使用場景: 訂單在十分鐘之內(nèi)未支付則

    2024年02月22日
    瀏覽(20)
  • 分布式消息隊列RabbitMQ-Linux下服務搭建,面試完騰訊我才發(fā)現(xiàn)這些知識點竟然沒掌握全

    分布式消息隊列RabbitMQ-Linux下服務搭建,面試完騰訊我才發(fā)現(xiàn)這些知識點竟然沒掌握全

    vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app 5.修改配置文件 這里面修改{loopback_users, [“guest”]}改為{loopback_users, []} {application, rabbit, %% - - erlang - - [{description, “RabbitMQ”}, {id, “RabbitMQ”}, {vsn, “3.6.5”}, {modules, [‘background_gc’,‘delegate’,‘delegate_sup’,‘dtree’,‘file_han

    2024年04月14日
    瀏覽(27)
  • C#調(diào)用RabbitMQ實現(xiàn)消息隊列

    C#調(diào)用RabbitMQ實現(xiàn)消息隊列

    前言 前幾天在做日志收集,用到了RabbitMQ,它作為一種中間件,需要對其進行下載,安裝,和配置。 消息隊列 什么是消息隊列?,我們這樣想一下,用戶訪問網(wǎng)站,最終是要將數(shù)據(jù)以HTTP的協(xié)議的方式,通過網(wǎng)絡傳輸?shù)街鳈C的某個端口上的。 那么,接收數(shù)據(jù)的方式是什么呢?

    2024年02月05日
    瀏覽(18)
  • SpringBoot RabbitMQ 實現(xiàn)消息隊列功能

    作者:禪與計算機程序設計藝術 在企業(yè)級應用中,為了提升系統(tǒng)性能、降低響應延遲、改善用戶體驗、增加系統(tǒng)的穩(wěn)定性、提高資源利用率等方面所需的功能之一就是使用消息隊列。RabbitMQ是一個開源的AMQP(Advanced Message Queuing Protocol)的實現(xiàn)消息隊列,它是用Erlang語言開發(fā)的。

    2024年02月09日
    瀏覽(24)
  • SSM 如何使用 RabbitMQ 實現(xiàn)消息隊列

    SSM 如何使用 RabbitMQ 實現(xiàn)消息隊列

    在分布式系統(tǒng)中,消息隊列是一種常見的通信方式,可以實現(xiàn)不同服務之間的異步通信和解耦。RabbitMQ 是一個開源的消息隊列軟件,本文將介紹如何在 SSM 框架中使用 RabbitMQ 實現(xiàn)消息隊列。 本文將使用 Spring Boot 作為 SSM 框架,使用 Maven 進行項目管理。 在開始之前,需要安裝

    2024年02月06日
    瀏覽(21)
  • SpringCloud-實現(xiàn)基于RabbitMQ的消息隊列

    消息隊列是現(xiàn)代分布式系統(tǒng)中常用的通信機制,用于在不同的服務之間傳遞消息。在Spring Cloud框架中,我們可以利用RabbitMQ實現(xiàn)強大而可靠的消息隊列系統(tǒng)。本篇博客將詳細介紹如何在Spring Cloud項目中集成RabbitMQ,并創(chuàng)建一個簡單的消息隊列。 這里是一個簡單的RabbitMQ消息隊列

    2024年03月11日
    瀏覽(24)
  • 模擬實現(xiàn)消息隊列(以 RabbitMQ 為藍本)

    模擬實現(xiàn)消息隊列(以 RabbitMQ 為藍本)

    核心概念1 生產(chǎn)者(Producer):生產(chǎn)者負責生成數(shù)據(jù)并將其放入緩沖區(qū)(隊列)中。生產(chǎn)者可以是一個線程或多個線程,它們可以并行地生成數(shù)據(jù)。當緩沖區(qū)(隊列)已滿時,生產(chǎn)者需要等待,直到有空間可用。 消費者(Consumer):消費者負責從緩沖區(qū)(隊列)中取出數(shù)據(jù)并進行處

    2024年02月13日
    瀏覽(23)
  • RabbitMQ系列(8)--實現(xiàn)RabbitMQ隊列持久化及消息持久化

    RabbitMQ系列(8)--實現(xiàn)RabbitMQ隊列持久化及消息持久化

    概念:在上一章文章中我們演示了消費者宕機的情況下消息沒有被消費成功后會重新入隊,然后再被消費,但如何保障RabbitMQ服務停掉的情況下,生產(chǎn)者發(fā)過來的消息不會丟失,這時候我們?yōu)榱讼⒉粫G失就需要將隊列和消息都標記為持久化。 1、實現(xiàn)RabbitMQ隊列持久化 只需

    2024年02月09日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包