目錄
前言
1.?創(chuàng)建MemoryDataCenter
2. 封裝Exchange 和 Queue方法
3. 封裝Binding操作
4. 封裝Message操作
4.1 封裝消息中心集合messageMap
4.2 封裝消息與隊列的關(guān)系集合queueMessageMap的操作
5. 封裝未確認(rèn)消息集合waitMessage的操作
6. 從硬盤中恢復(fù)數(shù)據(jù)到內(nèi)存中
7. MemoryDataCenter單元測試
結(jié)語
前言
? ? ? ? 上一節(jié)我們總結(jié)了服務(wù)器模塊的硬盤管理,將交換機(jī),隊列,綁定存書到Sqlite數(shù)據(jù)庫中,將消息按照隊進(jìn)行創(chuàng)建文件存儲在本地硬盤中.并且封裝了對于數(shù)據(jù)庫和文件的各種操作.實現(xiàn)了持久化的效果,但是實際的消息存儲/轉(zhuǎn)發(fā),主要靠內(nèi)存的結(jié)構(gòu).對于消息隊列來說,內(nèi)存部分是更關(guān)鍵的,內(nèi)存速度更快,可以達(dá)到更高的并發(fā).本節(jié)就對內(nèi)存管理進(jìn)行封裝.本項目全部代碼已上傳Gitee,鏈接放在文章末尾,歡迎大家訪問!
1.?創(chuàng)建MemoryDataCenter
路徑:mqserver.datacenter.MemoryDataCenter
考慮到多線程的原因,我們將HashMap替換成ConcurrentHashMap (對每個哈希桶進(jìn)行加鎖,相對來說是線程安全的)
@Data
public class MemoryDataCenter {
// 1. 交換機(jī) 多線程環(huán)境下使用,使用ConcurrentHashMap會相對線程安全
// key:ExchangeName,value:Exchange對象
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
// 2. 隊列 key:QueueName,value:MSQueue對象
private ConcurrentHashMap<String, MSQueue> queueMap = new ConcurrentHashMap<>();
// 3. 綁定 key:ExchangeName,value:HashMap(key:QueueName,value:MSQueue對象)
private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
// 4. 消息 key:MessageID,value:Message對象
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
// 5. 消息和隊列的映射關(guān)系 HashMap: key:QueueName,value:LinkedList(Message對象)
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
// 6. 未確認(rèn)的消息 HashMap: key:QueueName,value:HashMap(key:MessageID,value:Message對象)
private ConcurrentHashMap<String,ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
}
2. 封裝Exchange 和 Queue方法
主要就是插入和獲取數(shù)據(jù)以及刪除
/**
* 1. 針對內(nèi)存中的交換機(jī),隊列設(shè)置操作
*/
public void insertExchange(Exchange exchange) {
exchangeMap.put(exchange.getName(), exchange);
System.out.println("[MemoryDataCenter] 新交換機(jī)添加成功! exchangeName=" + exchange.getName());
}
public Exchange getExchange(String exchangeName) {
return exchangeMap.get(exchangeName);
}
public void deleteExchange(String exchangeName) {
exchangeMap.remove(exchangeName);
System.out.println("[MemoryDataCenter] 交換機(jī)刪除成功! exchangeName=" + exchangeName);
}
public void insertQueue(MSQueue queue) {
queueMap.put(queue.getName(), queue);
System.out.println("[MemoryDataCenter] 新隊列添加成功! queueName=" + queue.getName());
}
public MSQueue getQueue(String queueName) {
return queueMap.get(queueName);
}
public void deleteQueue(String queueName) {
queueMap.remove(queueName);
System.out.println("[MemoryDataCenter] 隊列刪除成功! queueName=" + queueName);
}
3. 封裝Binding操作
這里呢之所以將綁定的操作單獨列舉出來,是因為存儲綁定信息的數(shù)據(jù)結(jié)構(gòu)是相對比較復(fù)雜的,是嵌套的HashMap.
對于插入綁定信息:
1, 首先按照交換機(jī)的名字進(jìn)行查找,如果查找不到就進(jìn)行創(chuàng)建一個HashMap的數(shù)據(jù)結(jié)構(gòu)存儲到含有綁定信息的HashMap中,如果存在的話在按照隊列名字進(jìn)行查找綁定信息,如果查找到了,說明改綁定信息已經(jīng)插入過就不要進(jìn)行插入了,如果沒找到就進(jìn)行插入操作.
2. 在上述查找和插入的操作比并不是原子的,所以我們要給是上述操作,按照bindingMap進(jìn)行加鎖.以保證我們的線程操作是安全的.
下述是相關(guān)對于綁定的操作的代碼:
/**
* 2. 針對綁定進(jìn)行操作
*/
/**
* 2.1插入綁定信息
* @param binding
* @throws MqException
*/
public void insertBinding(Binding binding) throws MqException {
// ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
// if (bindingMap == null) {
// bindingMap = new ConcurrentHashMap<>();
// bindingsMap.put(binding.getExchangeName(), bindingMap);
// }
// 先使用 exchangeName 查一下, 對應(yīng)的哈希表是否存在. 不存在就創(chuàng)建一個.
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
k -> new ConcurrentHashMap<>());
synchronized (bindingMap) {
// 再根據(jù) queueName 查一下目前的綁定的交換機(jī)綁定的是否是當(dāng)前傳入的隊列. 如果已經(jīng)存在(存在相同的綁定關(guān)系了,就不需要進(jìn)行傳入), 就拋出異常. 不存在才能插入.
if (bindingMap.get(binding.getQueueName()) != null) {
throw new MqException("[MemoryDataCenter] 綁定已經(jīng)存在! exchangeName=" + binding.getExchangeName() +
", queueName=" + binding.getQueueName());
}
// 最后將綁定關(guān)系傳入到bingMap中
bindingMap.put(binding.getQueueName(), binding);
}
System.out.println("[MemoryDataCenter] 新綁定添加成功! exchangeName=" + binding.getExchangeName()
+ ", queueName=" + binding.getQueueName());
}
/**
* 2.2 獲取綁定1: 根據(jù)exchangeName, queueName 獲取唯一的綁定
* @param exchangeName
* @param queueName
*/
public Binding getBinding(String exchangeName, String queueName){
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
if (bindingMap == null){
return null;
}
synchronized (bindingMap){
// 防止當(dāng)別的操作刪除了這個隊列的綁定信息,而導(dǎo)致的線程錯誤
return bindingMap.get(queueName);
}
}
/**
* 2.3 獲取綁定2: 根據(jù)exchangeName 查詢所有綁定
* @param exchangeName
* @return
*/
public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) throws MqException {
if (bindingsMap.get(exchangeName) == null){
return null;
}
return bindingsMap.get(exchangeName);
}
/**
* 2.4 刪除綁定關(guān)系(單個) 一個交換機(jī)對應(yīng)的單個隊列的綁定關(guān)系
* @param binding
* @throws MqException
*/
public void deleteBinding(Binding binding) throws MqException {
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
if (bindingMap == null) {
// 該交換機(jī)沒有綁定任何隊列. 報錯.
throw new MqException("[MemoryDataCenter] 綁定不存在! exchangeName=" + binding.getExchangeName()
+ ", queueName=" + binding.getQueueName());
}
bindingMap.remove(binding.getQueueName());
System.out.println("[MemoryDataCenter] 綁定刪除成功! exchangeName=" + binding.getExchangeName()
+ ", queueName=" + binding.getQueueName());
}
/**
* 2.5 刪除綁定關(guān)系(多個) 1個交換機(jī)對應(yīng)的多個隊列的綁定關(guān)系.
*/
public void deleteBinding(String exchangeName){
bindingsMap.remove(exchangeName);
}
4. 封裝Message操作
4.1 封裝消息中心集合messageMap
- 1. 添加消息到消息中心
- 2. 根據(jù)消息ID查詢消息
- 3. 根據(jù)消息ID刪除消息
/**
* 3. 針對消息進(jìn)行操作
*/
/**
* 3.1 添加消息
* @param message
*/
public void addMessage(Message message) {
messageMap.put(message.getMessageID(), message);
System.out.println("[MemoryDataCenter] 新消息添加成功! messageId=" + message.getMessageID());
}
/**
* 3.2 根據(jù) id 查詢消息
* @param messageId
* @return
*/
public Message getMessage(String messageId) {
return messageMap.get(messageId);
}
/**
* 3.3 根據(jù) id 刪除消息
* @param messageId
*/
public void removeMessage(String messageId) {
messageMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息被移除! messageId=" + messageId);
}
4.2 封裝消息與隊列的關(guān)系集合queueMessageMap的操作
- 1. 發(fā)送消息到指定隊列名字的隊列
- 2. 從指定隊列中獲取消息集合
- 3. 獲取指定隊列名字隊列中消息的個數(shù)
/**
* 4 針對消息和隊列的關(guān)系進(jìn)行操作
*/
/**
* 4.1 發(fā)送消息到指定隊列
* @param queue
* @param message
*/
public void sendMessage(MSQueue queue, Message message) {
// 先根據(jù)隊列的名字, 找到該隊列對應(yīng)的消息鏈表.
// 先根據(jù)隊列的名字進(jìn)行查詢,查不到就進(jìn)行創(chuàng)建該隊列對應(yīng)的鏈表 // computeIfAbsent線程安全的
LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(),k-> new LinkedList<>());
// 再把數(shù)據(jù)加到 messages 里面
synchronized (messages) {
// 對該隊列進(jìn)行添加的時候需要進(jìn)行加鎖
messages.add(message);
}
// 在這里把該消息也往消息中心中插入一下. 假設(shè)如果 message 已經(jīng)在消息中心存在, 重復(fù)插入也沒關(guān)系.
// 主要就是相同 messageId, 對應(yīng)的 message 的內(nèi)容一定是一樣的. (服務(wù)器代碼不會對 Message 內(nèi)容做修改 basicProperties 和 body)
addMessage(message);
System.out.println("[MemoryDataCenter] 消息被添加到隊列中! messageId=" + message.getMessageID());
}
/**
* 4.2 從指定隊列名字中進(jìn)行提取信息
* @param queueName
* @return
*/
public Message pollMessage(String queueName){
LinkedList<Message> messages = queueMessageMap.get(queueName);
// 隊列中沒有信息
if (messages == null){
System.out.println("[MemoryDataCenter] 該隊列中沒有信息! queueName=" + queueName);
return null;
}
// 將隊列進(jìn)行頭刪除(提取信息)
synchronized (messages){
if (messages.size() == 0){
System.out.println("[MemoryDataCenter] 該隊列中沒有信息! queueName=" + queueName);
return null;
}
Message currentMessage = messages.remove(0); System.out.println
("[MemoryDataCenter] 消息已經(jīng)從隊列中取出! queueName=" + queueName + ", MessageID=" + currentMessage.getMessageID() );
return currentMessage;
}
}
/**
* 4.3 獲取指定隊列名字中消息的個數(shù)
* @param queueName
* @return
*/
public int getMessageCount(String queueName){
LinkedList<Message> messages = queueMessageMap.get(queueName);
// 隊列中沒有信息
if (messages == null){
System.out.println("[MemoryDataCenter] 該隊列中沒有信息! queueName=" + queueName);
return 0;
}
// 將隊列進(jìn)行頭刪除(提取信息)
synchronized (messages){
if (messages.size() == 0){
System.out.println("[MemoryDataCenter] 該隊列中沒有信息! queueName=" + queueName);
return 0;
}
return messages.size();
}
}
5. 封裝未確認(rèn)消息集合waitMessage的操作
- 1.?添加消息到等待確認(rèn)隊列
- 2. 從指定未確認(rèn)隊列中刪除消息
- 3. 根據(jù)指定的消息ID與未確認(rèn)隊列名字獲取消息內(nèi)容
/**
* 5. 未確認(rèn)消息Map的操作
*/
/**
* 5.1 添加消息到指定等待確認(rèn)隊列
* @param queueName
* @param message
*/
public void addMessageWaitAck(String queueName, Message message){
ConcurrentHashMap<String,Message> waitMessage = queueMessageWaitAckMap
.computeIfAbsent(queueName, k-> new ConcurrentHashMap<>());
waitMessage.put(message.getMessageID(),message);
System.out.println("[MemoryDataCenter] 消息進(jìn)入等待確認(rèn)隊列! messageID=" + message.getMessageID());
}
/**
* 5.2 從指定的未確認(rèn)消息隊列中進(jìn)行刪除消息
* @param queueName
* @param messageId
*/
public void removeMessageWaitAck(String queueName, String messageId){
ConcurrentHashMap<String,Message> waitMessage = queueMessageWaitAckMap.get(queueName);
if (waitMessage == null){
System.out.println("[MemoryDataCenter] 該隊列為空! queueName=" + queueName);
return;
}
waitMessage.remove(messageId);
System.out.println("[MemoryDataCenter] 消息已經(jīng)從等待確認(rèn)隊列中移除! messageId=" + messageId);
}
/**
* 5.3 根據(jù)指定消息ID從隊列中進(jìn)行獲取信息
* @param queueName
* @param messageId
* @return
*/
public Message geMessageWaitAck(String queueName, String messageId){
ConcurrentHashMap<String,Message> waitMessage = queueMessageWaitAckMap.get(queueName);
if (waitMessage == null){
System.out.println("[MemoryDataCenter] 該隊列為空! queueName=" + queueName);
return null;
}
return waitMessage.get(messageId);
}
6. 從硬盤中恢復(fù)數(shù)據(jù)到內(nèi)存中
使用之前封裝過的diskDataCenter進(jìn)行恢復(fù)數(shù)據(jù).
1. 清空當(dāng)前內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)
2. 恢復(fù)所有的交換機(jī),隊列,綁定,消息數(shù)據(jù),恢復(fù)消息數(shù)據(jù)的時候,要將消息中心和消息與隊列的映射進(jìn)行恢復(fù).
/**
* 6. 從硬盤中恢復(fù)數(shù)據(jù)到內(nèi)存中 (使用之前封裝好的管理硬盤的類進(jìn)行實現(xiàn))
*/
public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
// 1. 清空內(nèi)存中各種數(shù)據(jù)信息
queueMap.clear();
exchangeMap.clear();
bindingsMap.clear();
messageMap.clear();
queueMessageMap.clear();
// 2. 恢復(fù)所有的交換機(jī)信息
List<Exchange> exchanges = diskDataCenter.selectAllExchange();
for (Exchange exchange :exchanges) {
exchangeMap.put(exchange.getName(),exchange);
}
// 3. 恢復(fù)所有的隊列信息
List<MSQueue> queues = diskDataCenter.selectAllMSQueue();
for (MSQueue msQueue :queues) {
queueMap.put(msQueue.getName(),msQueue);
}
// 4. 恢復(fù)所有的綁定數(shù)據(jù)
List<Binding> bindings = diskDataCenter.selectAllBinding();
for (Binding binding: bindings){
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.
computeIfAbsent(binding.getExchangeName(), k-> new ConcurrentHashMap<>());
bindingMap.put(binding.getQueueName(),binding);
}
// 4. 恢復(fù)所有的消息數(shù)據(jù)
// 4.1 遍歷所有的隊列
// List<MSQueue> queues = diskDataCenter.selectAllMSQueue();
for (MSQueue msQueue:queues) {
LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(msQueue.getName());
// 4.2 將獲取的消息進(jìn)行進(jìn)行加入到隊列
queueMessageMap.put(msQueue.getName(),messages);
// 4.3 將消息添加上到消息中心
for (Message message : messages) {
messageMap.put(message.getMessageID(),message);
}
}
7. MemoryDataCenter單元測試
package com.example.demo.mqserver.datacenter;
import com.example.demo.DemoApplication;
import com.example.demo.common.MqException;
import com.example.demo.mqserver.core.*;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import static org.junit.jupiter.api.Assertions.*;
/**
* Created with IntelliJ IDEA.
* Description:
* User: YAO
* Date: 2023-07-31
* Time: 10:30
*/
@SpringBootTest
class MemoryDataCenterTest {
MemoryDataCenter memoryDataCenter = null;
@BeforeEach
void setUp() {
memoryDataCenter = new MemoryDataCenter();
}
@AfterEach
void tearDown() {
memoryDataCenter = null;
}
// 創(chuàng)建一個測試交換機(jī)
private Exchange createTestExchange(String exchangeName) {
Exchange exchange = new Exchange();
exchange.setName(exchangeName);
exchange.setType(ExchangeType.DIRECT);
exchange.setAutoDelete(false);
exchange.setDurable(true);
return exchange;
}
// 創(chuàng)建一個測試隊列
private MSQueue createTestQueue(String queueName) {
MSQueue queue = new MSQueue();
queue.setName(queueName);
queue.setDurable(true);
queue.setExclusive(false);
queue.setAutoDelete(false);
return queue;
}
/**
* 1. 針對交換機(jī)進(jìn)行操作
*/
@Test
public void testExchange(){
// 1. 創(chuàng)建交換機(jī)進(jìn)行插入
Exchange expectExchange = createTestExchange("testExchange");
memoryDataCenter.insertExchange(expectExchange);
// 2. 查詢交換機(jī)
Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
// 比較內(nèi)存中的引用是否是同一個引用
Assertions.assertEquals(expectExchange,actualExchange);
// 3. 刪除交換機(jī)
memoryDataCenter.deleteExchange("testExchange");
// 4. 查詢交換機(jī),比較結(jié)果
actualExchange = memoryDataCenter.getExchange("testExchange");
Assertions.assertNull(actualExchange);
}
/**
* 2. 針對隊列進(jìn)行操作
*/
@Test
public void testQueue(){
// 1. 創(chuàng)建交換機(jī)進(jìn)行插入
MSQueue expectQueue = createTestQueue("testQueue");
memoryDataCenter.insertQueue(expectQueue);
// 2. 查詢交換機(jī)
MSQueue actualQueue = memoryDataCenter.getQueue("testQueue");
// 比較內(nèi)存中的引用是否是同一個引用
Assertions.assertEquals(expectQueue,actualQueue);
// 3. 刪除交換機(jī)
memoryDataCenter.deleteQueue("testQueue");
// 4. 查詢交換機(jī),比較結(jié)果
actualQueue = memoryDataCenter.getQueue("testQueue");
Assertions.assertNull(actualQueue);
}
/**
* 3. 針對綁定進(jìn)行測試
*/
@Test
public void testBinding() throws MqException {
// 1.創(chuàng)建綁定并加入到集合中
Binding expectedBinding = new Binding();
expectedBinding.setExchangeName("testExchange");
expectedBinding.setQueueName("testQueue");
memoryDataCenter.insertBinding(expectedBinding);
// 2. 查詢綁定(單個)
Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
Assertions.assertEquals(expectedBinding,actualBinding);
// 2.1 查詢所有的綁定
ConcurrentHashMap<String, Binding> bindingMap = memoryDataCenter.getBindings("testExchange");
Assertions.assertEquals(1, bindingMap.size());
Assertions.assertEquals(expectedBinding, bindingMap.get("testQueue"));
// 3. 刪除綁定
memoryDataCenter.deleteBinding("testExchange");
actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
Assertions.assertNull(actualBinding);
bindingMap = memoryDataCenter.getBindings("testExchange");
Assertions.assertNull(bindingMap);
}
private Message createTestMessage(String content) {
Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());
return message;
}
/**
* 4. 針對消息進(jìn)行測試
*/
@Test
public void testMessage(){
// 1. 創(chuàng)建消息并插入
Message expectedMessage = createTestMessage("testMessage");
memoryDataCenter.addMessage(expectedMessage);
// 2. 查詢消息并比較
Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageID());
Assertions.assertEquals(expectedMessage, actualMessage);
// 4. 刪除消息
memoryDataCenter.removeMessage(expectedMessage.getMessageID());
// 5. 查詢消息并比較
actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageID());
Assertions.assertNull(actualMessage);
}
/**
* 5. 測試將消息發(fā)送到對列中
*/
@Test
public void sendMessage(){
// 1. 創(chuàng)建一個隊列. 創(chuàng)建10條消息,進(jìn)行插入到隊列
MSQueue expectQueue = createTestQueue("testQueue");
List<Message> expectMessage = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message message = createTestMessage("testMessage" + i);
memoryDataCenter.sendMessage(expectQueue,message);
expectMessage.add(message);
}
// 2.從隊列進(jìn)行取出消息
List<Message> actualMessage = new ArrayList<>();
while (true){
Message message = memoryDataCenter.pollMessage("testQueue");
if (message == null){
break;
}
actualMessage.add(message);
}
// 3. 比較消息前后是否一致
Assertions.assertEquals(expectMessage.size(),actualMessage.size());
for (int i = 0; i < expectMessage.size(); i++) {
Assertions.assertEquals(expectMessage.get(i),actualMessage.get(i));
}
}
/**
* 6. 測試未被確認(rèn)的消息
*/
@Test
public void testMessageWaitAck(){
// 1. 創(chuàng)建消息,插入到未被確認(rèn)的隊列中
Message expectedMessage = createTestMessage("expectedMessage");
memoryDataCenter.addMessageWaitAck("testQueue", expectedMessage);
// 2. 獲取消息從未被確認(rèn)的隊列中
Message actualMessage = memoryDataCenter.geMessageWaitAck("testQueue", expectedMessage.getMessageID());
Assertions.assertEquals(expectedMessage, actualMessage);
// 3. 從未被確認(rèn)的隊列中進(jìn)行刪除消息
memoryDataCenter.removeMessageWaitAck("testQueue", expectedMessage.getMessageID());
// 4. 比較刪除之后的隊列是否還有消息
actualMessage = memoryDataCenter.geMessageWaitAck("testQueue", expectedMessage.getMessageID());
Assertions.assertNull(actualMessage);
}
/**
* 7. 測試從硬盤中恢復(fù)數(shù)據(jù)到內(nèi)存
*/
@Test
public void testRecovery() throws IOException, MqException, ClassNotFoundException {
// 由于后續(xù)需要進(jìn)行數(shù)據(jù)庫操作, 依賴 MyBatis. 就需要先啟動 SpringApplication, 這樣才能進(jìn)行后續(xù)的數(shù)據(jù)庫操作.
DemoApplication.context = SpringApplication.run(DemoApplication.class);
// 1. 在硬盤上構(gòu)造好數(shù)據(jù)
DiskDataCenter diskDataCenter = new DiskDataCenter();
diskDataCenter.init();
// 構(gòu)造交換機(jī)
Exchange expectedExchange = createTestExchange("testExchange");
diskDataCenter.insertExchange(expectedExchange);
// 構(gòu)造隊列
MSQueue expectedQueue = createTestQueue("testQueue");
diskDataCenter.insertQueue(expectedQueue);
// 構(gòu)造綁定
Binding expectedBinding = new Binding();
expectedBinding.setExchangeName("testExchange");
expectedBinding.setQueueName("testQueue");
expectedBinding.setBindingKey("testBindingKey");
diskDataCenter.insertBinding(expectedBinding);
// 構(gòu)造消息
Message expectedMessage = createTestMessage("testContent");
diskDataCenter.sendMessage(expectedQueue, expectedMessage);
// 2. 執(zhí)行恢復(fù)操作
memoryDataCenter.recovery(diskDataCenter);
// 3. 對比結(jié)果
Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
Assertions.assertEquals(expectedExchange.getName(), actualExchange.getName());
Assertions.assertEquals(expectedExchange.getType(), actualExchange.getType());
Assertions.assertEquals(expectedExchange.isDurable(), actualExchange.isDurable());
Assertions.assertEquals(expectedExchange.isAutoDelete(), actualExchange.isAutoDelete());
MSQueue actualQueue = memoryDataCenter.getQueue("testQueue");
Assertions.assertEquals(expectedQueue.getName(), actualQueue.getName());
Assertions.assertEquals(expectedQueue.isDurable(), actualQueue.isDurable());
Assertions.assertEquals(expectedQueue.isAutoDelete(), actualQueue.isAutoDelete());
Assertions.assertEquals(expectedQueue.isExclusive(), actualQueue.isExclusive());
Binding actualBinding = memoryDataCenter.getBinding("testExchange", "testQueue");
Assertions.assertEquals(expectedBinding.getExchangeName(), actualBinding.getExchangeName());
Assertions.assertEquals(expectedBinding.getQueueName(), actualBinding.getQueueName());
Assertions.assertEquals(expectedBinding.getBindingKey(), actualBinding.getBindingKey());
Message actualMessage = memoryDataCenter.pollMessage("testQueue");
Assertions.assertEquals(expectedMessage.getMessageID(), actualMessage.getMessageID());
Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
// 4. 清理硬盤的數(shù)據(jù), 把整個 data 目錄里的內(nèi)容都刪掉(包含了 meta.db 和 隊列的目錄).
DemoApplication.context.close();
File dataDir = new File("./data");
FileUtils.deleteDirectory(dataDir);
}
}
結(jié)語
????????以上內(nèi)容就是針對內(nèi)存管理的封裝,主要是設(shè)計了6中數(shù)據(jù)機(jī)構(gòu)進(jìn)行存儲交換機(jī) 隊列 綁定 消息 消息和隊列的映射 未確認(rèn)信息.后續(xù)對數(shù)據(jù)進(jìn)行操作的時候會更加具有效率.這樣我們虛擬主機(jī)中兩大核心部分:硬盤管理和內(nèi)存管理都總結(jié)完成,下一節(jié)會對上述兩種操作進(jìn)一步封裝到(VirtualHost)中,然后正式的提出消息隊列服務(wù)器BrokerServer這個概念,對其進(jìn)行完善和功能封裝.請持續(xù)關(guān)注,謝謝!!!
完整的項目代碼已上傳Gitee,歡迎大家訪問.??????文章來源:http://www.zghlxwxcb.cn/news/detail-634032.html
模擬實現(xiàn)消息隊列https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq文章來源地址http://www.zghlxwxcb.cn/news/detail-634032.html
到了這里,關(guān)于模擬實現(xiàn)消息隊列項目(系列4) -- 服務(wù)器模塊(內(nèi)存管理)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!