目錄
前言
1. 創(chuàng)建VirtualHost
1.1 定義虛擬主機的相關(guān)屬性
1.2 VirtualHost 構(gòu)造方法?
1.3 交換機和隊列的創(chuàng)建和刪除
1.3.1 交換機操作
1.3.2 隊列操作?
1.4 綁定的創(chuàng)建和刪除
1.5?發(fā)送消息到指定的隊列/交換機
2. 實現(xiàn)路由規(guī)則Router
2.1?checkBindingKey()
2.2?checkRoutingKey()
2.3?route()
2.4 單元測試
3. 訂閱消息
3.1 添加一個訂閱者
3.2 創(chuàng)建訂閱者管理類ConsumerManager
3.3 訂閱消息小結(jié)
4. 消息確認?basicAck()
5. VirtualHost單元測試
結(jié)語
前言
? ? ? ? 寫到這里,內(nèi)存和硬盤的數(shù)據(jù)就組織完畢了,接下來我們就會引入在消息隊列初識中提出的一個概念 --- 虛擬主機.簡單回顧一下虛擬主機的概念: 它類似于MySQL的database,是一個邏輯的集合,一個BrokerServer上可以存在多個VirtualHost.在一個BrokerServer上可以組織不同的數(shù)據(jù),可以使用不同的虛擬主機做出邏輯上的區(qū)分.本章節(jié)就是進行進一步的封裝,同時實現(xiàn)一些消息隊列的API.這里需要注意的是在RabbitMq中,虛擬主機是可以隨便創(chuàng)建和刪除的,在本項目目前只是默認只有一個虛擬主機的存在,后續(xù)根據(jù)情況會進行擴展,這里也提前預留了對于多虛擬主機的管理的數(shù)據(jù)結(jié)構(gòu).保證了不同虛擬機中的交換機 隊列 綁定 消息都是相互隔離的.本項目全部代碼已上傳Gitee,鏈接放在文章末尾,歡迎大家訪問!
1. 創(chuàng)建VirtualHost
??????
注意: 這一塊比較重要也比較復雜,所以將代碼進行截圖加標注的形式進行總結(jié),完整的VirtualHost.class代碼會在講解完給出.
??????
1.1 定義虛擬主機的相關(guān)屬性
Router: 是用來定義交換機轉(zhuǎn)發(fā)的規(guī)則,主要實現(xiàn)的是對routingKey進行驗證以及判斷,具體的細節(jié)會在后面給出.
ConsumerManager: 實現(xiàn)的是管理消費者進行消費.
以上兩者就是鎖對象了,后續(xù)我們要對硬盤和內(nèi)存進行數(shù)據(jù)的讀寫,為了保證操作的原子性,以及線程安全我們會給相關(guān)操作進行加鎖.?
1.2 VirtualHost 構(gòu)造方法?
主要就是傳入虛擬主機的名字,對該虛擬主機的數(shù)據(jù)庫以及文件信息進行初始化,主要是對數(shù)據(jù)庫進行初始化.具體DataBaseManager.init()
初始化內(nèi)容如下:
?初始化完成,將硬盤中的數(shù)據(jù)恢復到內(nèi)存中
至此前置工作就差不多了.下面對一些重要的方法進行創(chuàng)建.
1.3 交換機和隊列的創(chuàng)建和刪除
1.3.1 交換機操作
如果交換機不存在就進行創(chuàng)建,存在就直接返回(ExchangeDeclare)
- 1. 更改交換機的名字: 交換機名字 = 虛擬主機的名字 + 交換機的名字(更加方便后續(xù)的管理)
- 2. 判斷交換機是否存在: 存在直接返回true即可,不存在就直接創(chuàng)建新的交換機即可.設置交換機的屬性,根據(jù)是否持久化寫入到硬盤,然后在寫入到內(nèi)存.這里需要注意的是,我們一定要先寫硬盤再寫內(nèi)存,因為些硬盤是一個失敗率很高的事情,經(jīng)常會因為文件權(quán)限問題導致數(shù)據(jù)寫入不進去.如果先寫內(nèi)存,而硬盤寫入不進去,就還需要堆內(nèi)存的數(shù)據(jù)進行刪除,這就很繁瑣了.
- 3. 以上整個操作是對交換機進行讀寫操作,為了保證線程安全,我們進行加鎖操作.
/**
* 1. 創(chuàng)建交換機
* 如果交換機不存在就進行創(chuàng)建,存在就直接返回
*/
// 創(chuàng)建交換機
// 如果交換機不存在, 就創(chuàng)建. 如果存在, 直接返回.
// 返回值是 boolean. 創(chuàng)建成功, 返回 true. 失敗返回 false
public boolean exchangeDeclare(String exchangeName,
ExchangeType exchangeType,
boolean durable,
boolean autoDelete,
Map<String, Object> arguments) {
// 1. 更改交換機的名字 交換機的名字 = 虛擬主機 + 交換機
exchangeName = virtualHostName + exchangeName;
try{
synchronized (exchangeLocker){
// 2. 判定該交換機是否存在
Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);
if (existsExchange != null){
System.out.println("[VirtualHost] 交換機已經(jīng)存在!");
return true;
}
// 3. 不存在,直接進行創(chuàng)建新的交換機
Exchange exchange = new Exchange();
exchange.setName(exchangeName);
exchange.setType(exchangeType);
exchange.setDurable(durable);
exchange.setAutoDelete(autoDelete);
exchange.setArguments(arguments);
// 4. 將構(gòu)造好的交換機進行寫入硬盤(含有持久化信息的交換機) 先寫硬盤后寫內(nèi)存
if (durable){
diskDataCenter.insertExchange(exchange);
}
// 5. 將交換機寫入到內(nèi)存中
memoryDataCenter.insertExchange(exchange);
System.out.println("[VirtualHost] 交換機創(chuàng)建完成! exchangeName="+exchangeName);
// 上述操作為什么不先寫內(nèi)存后寫硬盤?
// 因為寫硬盤操作比較容易出現(xiàn)異常,如果寫入硬盤失敗,寫入內(nèi)存成功,再進行從內(nèi)存中進行刪除就比較麻煩了
}
return true;
} catch (Exception e){
System.out.println("[VirtualHost] 交換機創(chuàng)建失敗! exchangeName="+exchangeName);
e.printStackTrace();
return false;
}
}
刪除交換機
- 1. 更改交換機的名字: 交換機名字 = 虛擬主機的名字 + 交換機的名字(更加方便后續(xù)的管理)
- 2. 根據(jù)交換機的名字得到交換機對象,判斷交換機是否為空,不為空進行刪除操作,還是先進行刪除硬盤的數(shù)據(jù),再刪除內(nèi)存中數(shù)據(jù)
- 3. 以上整個操作是對交換機進行讀寫操作,為了保證線程安全,我們進行加鎖操作.
/**
* 2.刪除交換機
* @param exchangeName 交換機名字
* @return
*/
public boolean exchangeDelete(String exchangeName) {
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker) {
// 1. 先找到對應的交換機.
Exchange toDelete = memoryDataCenter.getExchange(exchangeName);
if (toDelete == null) {
throw new MqException("[VirtualHost] 交換機不存在無法刪除!");
}
// 2. 刪除硬盤上的數(shù)據(jù)
if (toDelete.isDurable()) {
diskDataCenter.deleteExchange(exchangeName);
}
// 3. 刪除內(nèi)存中的交換機數(shù)據(jù)
memoryDataCenter.deleteExchange(exchangeName);
System.out.println("[VirtualHost] 交換機刪除成功! exchangeName=" + exchangeName);
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 交換機刪除失敗! exchangeName=" + exchangeName);
e.printStackTrace();
return false;
}
}
1.3.2 隊列操作?
針對隊列創(chuàng)建和刪除操作,這里就不做過多的解釋了,過程跟上述交換機的操作一樣. 下面給出代碼:
/**
* 3. 創(chuàng)建隊列
* @param queueName 隊列名
* @param durable 持久化
* @param exclusive 隊列獨有
* @param autoDelete 自動刪除
* @param arguments 其他聲明
* @return
*/
public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) {
// 把隊列的名字, 給拼接上虛擬主機的名字.
queueName = virtualHostName + queueName;
try {
synchronized (queueLocker) {
// 1. 判定隊列是否存在
MSQueue existsQueue = memoryDataCenter.getQueue(queueName);
if (existsQueue != null) {
System.out.println("[VirtualHost] 隊列已經(jīng)存在! queueName=" + queueName);
return true;
}
// 2. 創(chuàng)建隊列對象
MSQueue queue = new MSQueue();
queue.setName(queueName);
queue.setDurable(durable);
queue.setExclusive(exclusive);
queue.setAutoDelete(autoDelete);
queue.setArguments(arguments);
// 3. 寫硬盤
if (durable) {
diskDataCenter.insertQueue(queue);
}
// 4. 寫內(nèi)存
memoryDataCenter.insertQueue(queue);
System.out.println("[VirtualHost] 隊列創(chuàng)建成功! queueName=" + queueName);
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 隊列創(chuàng)建失敗! queueName=" + queueName);
e.printStackTrace();
return false;
}
}
/**
* 4. 刪除隊列
* @param queueName 隊列名
* @return
*/
public boolean queueDelete(String queueName) {
queueName = virtualHostName + queueName;
try {
synchronized (queueLocker) {
// 1. 根據(jù)隊列名字, 查詢下當前的隊列對象
MSQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 隊列不存在! 無法刪除! queueName=" + queueName);
}
// 2. 刪除硬盤數(shù)據(jù)
if (queue.isDurable()) {
diskDataCenter.deleteQueue(queueName);
}
// 3. 刪除內(nèi)存數(shù)據(jù)
memoryDataCenter.deleteQueue(queueName);
System.out.println("[VirtualHost] 刪除隊列成功! queueName=" + queueName);
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 刪除隊列失敗! queueName=" + queueName);
e.printStackTrace();
return false;
}
}
1.4 綁定的創(chuàng)建和刪除
- 1. 更改交換機和隊列的名字: 交換機名字 = 虛擬主機的名字 + 交換機的名字??隊列名字 = 虛擬主機的名字 + 隊列的名字
- 2. 根據(jù)交換機和隊列的名字得到綁定信息的對象,判斷綁定是否為空,不為空拋出異常
- 3. 綁定對象為空: 1, 判斷綁定的bindingKey是否合法. 2.合法就創(chuàng)建綁定對象,設置響應的綁定屬性.
- 4.?獲取一下對應的交換機和隊列. 如果交換機或者隊列不存在, 這樣的綁定也是無法創(chuàng)建的.
- 5. 寫入硬盤,再寫內(nèi)存
- 6. 以上整個操作是對交換機和隊列進行讀寫操作,為了保證線程安全,我們進行加鎖操作.
?這一步我們在Router進行設置一個方法,等下面更加詳細的介紹router類.
/**
* 5. 創(chuàng)建綁定
* @param queueName 隊列名字
* @param exchangeName 交換機名字
* @param bindingKey 綁定規(guī)則
* @return
*/
public boolean queueBind(String queueName, String exchangeName, String bindingKey) {
// 1. 轉(zhuǎn)換交換機和隊列的名字
queueName = virtualHostName + queueName;
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker){
synchronized (queueLocker){
// 2. 判斷交換機和隊列是否已經(jīng)綁定成功
Binding existBinding = memoryDataCenter.getBinding(exchangeName,queueName);
if (existBinding != null){
throw new MqException("[VirtualHost] binding 已經(jīng)存在! queueName=" + queueName+ ", exchangeName=" + exchangeName);
}
// 3. 驗證bing中的bindingKey 是否合法
if (!router.checkBindingKey(bindingKey)){
throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey);
}
// 4. 創(chuàng)建綁定對象
Binding binding = new Binding();
binding.setExchangeName(exchangeName);
binding.setQueueName(queueName);
binding.setBindingKey(bindingKey);
// 5. 獲取對應的交換機和隊列,判斷是否是存在的
MSQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 隊列不存在! queueName=" + queueName);
}
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if (exchange == null) {
throw new MqException("[VirtualHost] 交換機不存在! exchangeName=" + exchangeName);
}
// 5. 先寫硬盤
if (queue.isDurable() && exchange.isDurable()) {
diskDataCenter.insertBinding(binding);
}
// 6. 寫入內(nèi)存
memoryDataCenter.insertBinding(binding);
System.out.println("[VirtualHost] 綁定創(chuàng)建成功! exchangeName=" + exchangeName
+ ", queueName=" + queueName);
}
return true;
}
} catch (MqException e) {
System.out.println("[VirtualHost] 綁定創(chuàng)建失敗! exchangeName=" + exchangeName
+ ", queueName=" + queueName);
e.printStackTrace();
return false;
}
}
刪除綁定?
- 1. 更改交換機和隊列的名字: 交換機名字 = 虛擬主機的名字 + 交換機的名字??隊列名字 = 虛擬主機的名字 + 隊列的名字
- 2. 根據(jù)交換機和隊列的名字得到綁定信息的對象,判斷綁定是否為空,為空拋出異常
- 3. 從硬盤進行刪除,從內(nèi)存進行刪除
- 4. 以上整個操作是對交換機和隊列進行讀寫操作,為了保證線程安全,我們進行加鎖操作.這里需要注意的是,我們對交換機和隊列進行加鎖的時候,順序要和創(chuàng)建綁定的順序是一致的.不然會出現(xiàn)死鎖的現(xiàn)象.
/**
* 6. 刪除綁定
* @param queueName 隊列名
* @param exchangeName 交換機名字
* @return
*/
public boolean queueUnbind(String queueName, String exchangeName) {
queueName = virtualHostName + queueName;
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker) {
synchronized (queueLocker) {
// 1. 獲取 binding 看是否已經(jīng)存在~
Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);
if (binding == null) {
throw new MqException("[VirtualHost] 刪除綁定失敗! 綁定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName);
}
// 2. 無論綁定是否持久化了, 都嘗試從硬盤刪一下. 就算不存在, 這個刪除也無副作用.
diskDataCenter.deleteBinding(binding);
// 3. 刪除內(nèi)存的數(shù)據(jù)
memoryDataCenter.deleteBinding(binding);
System.out.println("[VirtualHost] 刪除綁定成功!");
}
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 刪除綁定失敗!");
e.printStackTrace();
return false;
}
}
1.5?發(fā)送消息到指定的隊列/交換機
發(fā)布消息其實就是把消息發(fā)送到指定的交換機中,然后根據(jù)綁定關(guān)系發(fā)送到指定的隊列
- 1. 更改交換機和隊列的名字: 交換機名字 = 虛擬主機的名字 + 交換機的名字??隊列名字 = 虛擬主機的名字 + 隊列的名字?
- 2. 檢查消息的routingKey是否合法,不合法拋出異常
- 3. 根據(jù)傳入的交換機的名字進行查找交換機對象,然后判斷交換機的類型,而進行下一步的行為.
- 4. 如果交換機類型為DIRECT,則表示為直接交換機,則把routingKey作為隊列的名字,先進行根據(jù)傳入的參數(shù),創(chuàng)建消息對象,然后按照剛才組合好的隊列名字進行查找隊列,查找隊列進行發(fā)送消息,沒查找進行拋出異常.發(fā)送消息的時候判斷消息是否是持久化的,是持久化就往硬盤中寫入,否則只寫內(nèi)存就可以.發(fā)送完消息之后,要進行重要的操作.通知消費者進行消費消息.這一塊是在管理消費者進行消費消息實現(xiàn)的.
- 5. 如果交換機類型為Fanout 或者 Topic 我們需要在Router中進行設置相應的路由規(guī)則.
/**
* 9. 發(fā)送消息到指定的隊列/交換機
*/
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {
try {
// 1. 轉(zhuǎn)換交換機的名字
exchangeName = virtualHostName + exchangeName;
// 2. 檢查 routingKey 是否合法.
if (!router.checkRoutingKey(routingKey)) {
throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);
}
// 3. 查找交換機對象
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if (exchange == null) {
throw new MqException("[VirtualHost] 交換機不存在! exchangeName=" + exchangeName);
}
// 4. 判定交換機的類型
if (exchange.getType() == ExchangeType.DIRECT) {
// 按照直接交換機的方式來轉(zhuǎn)發(fā)消息
// 以 routingKey 作為隊列的名字, 直接把消息寫入指定的隊列中.
// 此時, 可以無視綁定關(guān)系.
String queueName = virtualHostName + routingKey;
// 5. 構(gòu)造消息對象
Message message = Message.createMessageWithId(routingKey, basicProperties, body);
// 6. 查找該隊列名對應的對象
MSQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 隊列不存在! queueName=" + queueName);
}
// 7. 隊列存在, 直接給隊列中寫入消息
sendMessage(queue, message);
} else {
// 按照 fanout 和 topic 的方式來轉(zhuǎn)發(fā).
// 5. 找到該交換機關(guān)聯(lián)的所有綁定, 并遍歷這些綁定對象
ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);
for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {
// 1) 獲取到綁定對象, 判定對應的隊列是否存在
Binding binding = entry.getValue();
MSQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
if (queue == null) {
// 此處咱們就不拋出異常了. 可能此處有多個這樣的隊列.
// 希望不要因為一個隊列的失敗, 影響到其他隊列的消息的傳輸.
System.out.println("[VirtualHost] basicPublish 發(fā)送消息時, 發(fā)現(xiàn)隊列不存在! queueName=" + binding.getQueueName());
continue;
}
// 2) 構(gòu)造消息對象
Message message = Message.createMessageWithId(routingKey, basicProperties, body);
// 3) 判定這個消息是否能轉(zhuǎn)發(fā)給該隊列.
// 如果是 fanout, 所有綁定的隊列都要轉(zhuǎn)發(fā)的.
// 如果是 topic, 還需要判定下, bindingKey 和 routingKey 是不是匹配.
if (!router.route(exchange.getType(), binding, message)) {
continue;
}
// 4) 真正轉(zhuǎn)發(fā)消息給隊列
sendMessage(queue, message);
}
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 消息發(fā)送失敗!");
e.printStackTrace();
return false;
}
}
private void sendMessage(MSQueue queue, Message message) throws IOException, MqException, InterruptedException {
// 此處發(fā)送消息, 就是把消息寫入到 硬盤 和 內(nèi)存 上. 根據(jù)此條消息時是否要進行持久化進行判斷
int deliverMode = message.getDeliverMode();
// deliverMode 為 1 , 不持久化. deliverMode 為 2 表示持久化.
if (deliverMode == 2) {
diskDataCenter.sendMessage(queue, message);
}
// 寫入內(nèi)存
memoryDataCenter.sendMessage(queue, message);
// 此處還需要補充一個邏輯, 通知消費者可以消費消息了.
consumerManager.notifyConsume(queue.getName());
}
2. 實現(xiàn)路由規(guī)則Router
?這個類我們實現(xiàn)具體的路由轉(zhuǎn)發(fā)規(guī)則,對之前還沒實現(xiàn)的方法進行實現(xiàn).還未實現(xiàn)的方法具體如下:
1. 在創(chuàng)建綁定的時候我們對bindingKey進行驗證是否合法checkBindingKey();
2. 在往交換機進行發(fā)送消息的時候,我們對消息的routingKey進行驗證\checkRoutingKey();
3. 當消息插入到交換機之后,根據(jù)交換機的主題往隊列中分發(fā)消息的時候.對不同主題的交換機實現(xiàn)不同的路由規(guī)則route();
以上是我們在虛擬主機類中還沒有進行實現(xiàn)的方法.下面進行一一實現(xiàn):
2.1?checkBindingKey()
?以下是我們合法的BindingKey的規(guī)則
/**
* 驗證bindingKey是否是合法的
* 1. 數(shù)字, 字母, 下劃線
* 2. 使用 . 分割成若干部分
* 3. 允許存在 * 和 # 作為通配符. 但是通配符只能作為獨立的分段.
* @return
*/
public boolean checkBindingKey(String bindingKey){
if (bindingKey.length() == 0) {
// 空字符串, 也是合法情況. 比如在使用 direct / fanout 交換機的時候, bindingKey 是用不上的.
return true;
}
// 檢查字符串中不能存在非法字符
for (int i = 0; i < bindingKey.length(); i++) {
char ch = bindingKey.charAt(i);
if (ch >= 'A' && ch <= 'Z') {
continue;
}
if (ch >= 'a' && ch <= 'z') {
continue;
}
if (ch >= '0' && ch <= '9') {
continue;
}
if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {
continue;
}
return false;
}
// 檢查 * 或者 # 是否是獨立的部分.
// aaa.*.bbb 合法情況; aaa.a*.bbb 非法情況.
String[] words = bindingKey.split("\\.");
for (String word : words) {
// 檢查 word 長度 > 1 并且包含了 * 或者 # , 就是非法的格式了.
if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {
return false;
}
}
// 約定一下, 通配符之間的相鄰關(guān)系(人為約定的).
// 為啥這么約定? 因為前三種相鄰的時候, 實現(xiàn)匹配的邏輯會非常繁瑣, 同時功能性提升不大~~
// 1. aaa.#.#.bbb => 非法
// 2. aaa.#.*.bbb => 非法
// 3. aaa.*.#.bbb => 非法
// 4. aaa.*.*.bbb => 合法
for (int i = 0; i < words.length - 1; i++) {
// 連續(xù)兩個 ##
if (words[i].equals("#") && words[i + 1].equals("#")) {
return false;
}
// # 連著 *
if (words[i].equals("#") && words[i + 1].equals("*")) {
return false;
}
// * 連著 #
if (words[i].equals("*") && words[i + 1].equals("#")) {
return false;
}
}
return true;
}
2.2?checkRoutingKey()
驗證routingKey是合法的.routingKey是與BindingKey進行匹配的,所以必須是具體的.????????
?
/**
* 驗證routingKey是否是合法的
* 1. 數(shù)字, 字母, 下劃線
* 2. 使用 . 分割成若干部分
* @return
*/
public boolean checkRoutingKey(String routingKey){
if (routingKey.length() == 0){
// 空字符串,合法的情況 當交換機的類型為fanout的時候,是不需要的,所以可以設置為""
return true;
}
for (int i = 0; i < routingKey.length(); i++) {
char ch = routingKey.charAt(i);
// 判定該字符是否是大寫字母
if (ch >= 'A' && ch <= 'Z') {
continue;
}
// 判定該字母是否是小寫字母
if (ch >= 'a' && ch <= 'z') {
continue;
}
// 判定該字母是否是阿拉伯數(shù)字
if (ch >= '0' && ch <= '9') {
continue;
}
// 判定是否是 _ 或者 .
if (ch == '_' || ch == '.') {
continue;
}
// 該字符, 不是上述任何一種合法情況, 就直接返回 false
return false;
}
// 把每個字符都檢查過, 沒有遇到非法情況. 此時直接返回 true
return true;
}
2.3?route()
判斷交換機的類型進而得出是否可以進行給隊列進行轉(zhuǎn)發(fā)消息.
1. 交換機的類型為fanout.代表給交換機進行綁定的所有隊列進行轉(zhuǎn)發(fā)消息.
2. 交換機的類型為Topic,需要對routingKey進行判斷.進而設置給隊列轉(zhuǎn)發(fā)消息
/**
* 判斷是否可以給綁定的交換機進行轉(zhuǎn)發(fā)消息
* @return
*/
public boolean route(ExchangeType type, Binding binding, Message message) throws MqException {
if (type == ExchangeType.FANOUT){
// 如果交換機類型為 fan-out 就直接進行返回true,表示轉(zhuǎn)發(fā)給當前當前綁定的所有對列
return true;
}else if(type == ExchangeType.TOPIC){
// 如果是主題交換機,規(guī)則就比較復雜
return routerTopic(binding,message);
}else {
throw new MqException("[Router] 交換機類型有誤 exchangeType=" + type);
}
}
?對于主題交換機,我們進行詳細的講解.
- 1.?將bindingKey 和 routingKey 進行按照"."進行分割成字符串數(shù)組
- 2. 定義下標進行遍歷數(shù)組
- 3. 遍歷兩個數(shù)組,主要分為5種情況.
- 3.1? 當bindingKey遇到*號時直接跳過*,兩個下標都進行自增1
- 3.2 當bindingKey遇到#號,如果此時#號是bindingKey的最后一位,那么直接返回true
- 3.3 當bindingKey遇到#號,如果此時#號不是最后一位,就去匹配#號下一位在routingKey的部分,匹配到了就將routingIndex指到匹配的位置,進而在進行上述循環(huán),如果沒匹配到就返回false
- 3.4?此時沒有遇見通配符,所有的內(nèi)容部都要進行匹配上,匹配不上就返回false
- 3.5 最后判斷此時兩個數(shù)組的下標是否都比較到了末尾.比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失敗的
/**
* 用來實現(xiàn):topic類型的交換機的轉(zhuǎn)發(fā)規(guī)則
* @param binding 綁定信息對象
* @param message 消息對象
* @return
*/
private boolean routerTopic(Binding binding, Message message) {
// 1. 將bindingKey 和 routingKey 進行按照"."進行分割
String[] bindingTokens = binding.getBindingKey().split("\\.");
String[] routingTokens = message.getRoutingKey().split("\\.");
// 2. 定義用來遍歷數(shù)組的下標
int bindingIndex = 0;
int routingIndex = 0;
// 3. 進行遍歷兩個數(shù)組
while (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length){
if (bindingTokens[bindingIndex].equals("*")){
// (1.)遇到*號兩個下標直接跳過 * 可以匹配一個部分
bindingIndex++;
routingIndex++;
}else if (bindingTokens[bindingIndex].equals("#")){
bindingIndex += 1;
// (2.)遇到#號 # 可以匹配多個部分
if (bindingIndex == bindingTokens.length){
// (3.)當遇到#號,#號的下標為最后一個元素的時候,直接返回true,因為可以直接匹配后面所有的內(nèi)容
return true;
}else {
// (4.)當遇到#號,后面后還有內(nèi)容的時候,就去匹配#號下一個部分在routingKey的部分,
// 匹配了就直接將bindingIndex指到bindingTokens下一個部分,同時將routingIndex指到匹配的地方
// 沒匹配配到就返回false
routingIndex = findNextMatch(routingIndex,routingTokens,bindingTokens[bindingIndex]);
if (routingIndex == -1){
return false;
}
bindingIndex++;
routingIndex++;
}
}else {
// (5.)此時沒有遇見通配符,所有的內(nèi)容部都要進行匹配上
if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){
return false;
}
bindingIndex++;
routingIndex++;
}
}
// (6.)最后判斷此時兩個數(shù)組的下標是否都比較到了末尾
// 比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失敗的
if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {
return true;
}
return false;
}
/**
* 給定起始下標去在一個數(shù)組中尋找指定數(shù)組元素,找到就返回該元素在數(shù)組的下標,沒找到就返回-1;
* @param routingIndex 起始下標
* @param routingTokens 目標數(shù)組
* @param bindingToken 目標元素
* @return
*/
private int findNextMatch(int routingIndex, String[] routingTokens, String bindingToken) {
for (int i = routingIndex; i < routingTokens.length; i++) {
if (routingTokens[i].equals(bindingToken)){
return i;
}
}
return -1;
}
以上就是整個Router的所有方法.我們對上述代碼進行單元測試.
2.4 單元測試
?
?
package com.example.demo.mqserver.core;
import com.example.demo.common.MqException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import static org.junit.jupiter.api.Assertions.*;
/**
* Created with IntelliJ IDEA.
* Description:測試交換機的轉(zhuǎn)發(fā)規(guī)則(交換機類型為topic)
* User: YAO
* Date: 2023-08-01
* Time: 13:56
*/
@SpringBootTest
class RouterTest {
private Router router = new Router();
private Binding binding = null;
private Message message = null;
@BeforeEach
public void setUp() {
binding = new Binding();
message = new Message();
}
@AfterEach
public void tearDown() {
binding = null;
message = null;
}
/**
* [測試用例]
* binding key routing key result
* aaa aaa true
* aaa.bbb aaa.bbb true
* aaa.bbb aaa.bbb.ccc false
* aaa.bbb aaa.ccc false
* aaa.bbb.ccc aaa.bbb.ccc true
* aaa.* aaa.bbb true
* aaa.*.bbb aaa.bbb.ccc false
* *.aaa.bbb aaa.bbb false
* # aaa.bbb.ccc true
* aaa.# aaa.bbb true
* aaa.# aaa.bbb.ccc true
* aaa.#.ccc aaa.ccc true
* aaa.#.ccc aaa.bbb.ccc true
* aaa.#.ccc aaa.aaa.bbb.ccc true
* #.ccc ccc true
* #.ccc aaa.bbb.ccc true
*/
@Test
public void test1() throws MqException {
binding.setBindingKey("aaa");
message.setRoutingKey("aaa");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test2() throws MqException {
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.bbb");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test3() throws MqException {
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test4() throws MqException {
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.ccc");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test5() throws MqException {
binding.setBindingKey("aaa.bbb.ccc");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test6() throws MqException {
binding.setBindingKey("aaa.*");
message.setRoutingKey("aaa.bbb");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test7() throws MqException {
binding.setBindingKey("aaa.*.bbb");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test8() throws MqException {
binding.setBindingKey("*.aaa.bbb");
message.setRoutingKey("aaa.bbb");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test9() throws MqException {
binding.setBindingKey("#");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test10() throws MqException {
binding.setBindingKey("aaa.#");
message.setRoutingKey("aaa.bbb");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test11() throws MqException {
binding.setBindingKey("aaa.#");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test12() throws MqException {
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test13() throws MqException {
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test14() throws MqException {
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test15() throws MqException {
binding.setBindingKey("#.ccc");
message.setRoutingKey("ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test16() throws MqException {
binding.setBindingKey("#.ccc");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
}
?單元測試通過.
3. 訂閱消息
????????在我們的虛擬主機中進行添加方法完成消息的訂閱.要想完成消息的訂閱,就需要在消息隊列中新建一個列表consumerEnvList用來存儲消費者的信息,當有消息進行存儲到隊列的時候,此時選出消費者進行消費消息.而消費者消費信息的這個環(huán)境需要單獨定義一個類ConsumerEnv進行描述.以上這個消費信息的過程我們定義一個類ConsumerManager進行管理這些邏輯.
3.1 添加一個訂閱者
給隊列添加消費者,當隊列接收到消息的時候,就要將消息推送給訂閱者
public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
// 構(gòu)造一個 ConsumerEnv 對象, 把這個對應的隊列找到, 再把這個 Consumer 對象添加到該隊列中.
queueName = virtualHostName + queueName;
try {
consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);
System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] basicConsume 失敗! queueName=" + queueName);
e.printStackTrace();
return false;
}
}
此處插入的參數(shù)Consumer相當于一個回調(diào)函數(shù),就是一個函數(shù)式接口.我們在common中進行定義Consumer
@FunctionalInterface
public interface Consumer {
// Delivery 的意思是 "投遞", 這個方法預期是在每次服務器收到消息之后, 來調(diào)用.
// 通過這個方法把消息推送給對應的消費者.
// (注意! 這里的方法名和參數(shù), 也都是參考 RabbitMQ 展開的)
void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws IOException, MqException;
}
定義這個回調(diào)函數(shù)表示:收到消息之后要對消息進行處理.
3.2 創(chuàng)建訂閱者管理類ConsumerManager
1.??這個類是和虛擬主機是一一對應的,每個虛擬主機都有一個管理消費者的對象,而管理的消費者的對象對應的是與之對應的.
2. 我們采用一個堵塞隊列來記錄收到消息的的隊列名字,每次隊列收到消息,就會往這個隊列中進行添加隊列的名字,然后后續(xù)進行通知這個隊列的消費者進行消費消息.
3. 單獨使用一個線程池用來執(zhí)行消息的回調(diào).(主要是獲取到消息之后,給響應設置消息的屬性與消息本體發(fā)送給客戶端.)
4. 我們設置一個掃描線程,從堵塞隊列不斷地取出元素,進而找到隊列,在這個隊列進行消費消息,并且設置掃描線程為后臺線程,這樣就不會阻止進程的結(jié)束.
public class ConsumerManager {
// 1. 持有虛擬主機對象的引用,用來操作數(shù)據(jù)
private VirtualHost parent;
// 2. 指定一個線程池,負責執(zhí)行具體的回調(diào)任務
private ExecutorService workPool = Executors.newCachedThreadPool();
// 3. 存放令牌的隊列,存放接收到消息的隊列名字(堵塞隊列)
// 當這個堵塞隊列一接收到隊列的名字,掃描線程就會就會找到虛擬主機,然后找到這個隊列,進而消費消息
private BlockingQueue<String> tokenQueue = new LinkedBlockingDeque<>();
// 4. 掃描線程 (關(guān)注令牌隊列中添加了哪些隊列的名字,就知道哪些隊列添加了消息,取出消息,進而交給線程池,進行消費這些消息)
private Thread scannerThread = null;
}
1. 給堵塞隊列設置接口,供虛擬主機進行調(diào)用.
/**
* 1. 收到消息,通知消費者進行消費消息(將消息對應的隊列名字添加到堵塞隊列中)
*/
public void notifyConsume(String queueName) throws InterruptedException {
tokenQueue.put(queueName);
}
2. 實現(xiàn)掃描線程
public ConsumerManager(VirtualHost p) {
parent = p;
scannerThread = new Thread(() -> {
while (true) {
try {
// 1. 拿到令牌
String queueName = tokenQueue.take();
// 2. 根據(jù)令牌, 找到隊列
MSQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
if (queue == null) {
throw new MqException("[ConsumerManager] 取令牌后發(fā)現(xiàn), 該隊列名不存在! queueName=" + queueName);
}
// 3. 從這個隊列中消費一個消息.
synchronized (queue) {
consumeMessage(queue);
}
} catch (InterruptedException | MqException e) {
e.printStackTrace();
}
}
});
// 把線程設為后臺線程.
// 后臺線程不會影響進程的結(jié)束
scannerThread.setDaemon(true);
scannerThread.start();
}
3. 添加消費者環(huán)境ConsumerEnv到指定的隊列
我們在common中實現(xiàn)這個類
@Data
public class ConsumerEnv {
// 1. 消費者的身份標識
private String consumerTag;
// 2. 消費者消費隊列的名字
private String queueName;
// 3. 是否自動應答
private boolean autoAck;
// 4. 通過這個回調(diào)函數(shù)來處理收到的消息.
private Consumer consumer;
public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
this.consumerTag = consumerTag;
this.queueName = queueName;
this.autoAck = autoAck;
this.consumer = consumer;
}
}
(1) 按照指定的隊列名找到這個類.
(2)?創(chuàng)建消費者環(huán)境對象,進行添加,同時如果這個隊列的消息存在,就需要進行消費這些信息,調(diào)用consumeMessage()方法傳入隊列的名字.
/**
* 2. 新增Consumer對象到指定的對列
*/
public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
// 找到對應的隊列.
MSQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
if (queue == null) {
throw new MqException("[ConsumerManager] 隊列不存在! queueName=" + queueName);
}
ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);
synchronized (queue) {
queue.addConsumerEnv(consumerEnv);
// 如果當前隊列中已經(jīng)有了一些消息了, 需要立即就消費掉.
int n = parent.getMemoryDataCenter().getMessageCount(queueName);
for (int i = 0; i < n; i++) {
// 這個方法調(diào)用一次就消費一條消息.
consumeMessage(queue);
}
}
}
4. 消費消息 consumeMessage()
(1) 因為一個隊列中可能會有多個消費者,我們按照輪詢的方式進行挑選消費者進行消費消息,在隊列的類中,設置方法chooseConsumer()
/**
* 挑選訂閱者 進行消費隊列中的消息 (輪詢的方式)
* @return
*/
public ConsumerEnv chooseConsumer(){
// 1. 如果當前隊列對應的消費者的數(shù)量為0,直接返回null,表示沒有篩選到消費者
if (consumerEnvList.size() == 0){
return null;
}
// 2. 使用當前訂閱到的下標進行對消費者列表取模,然后進行挑選消費者記性消費消息,實現(xiàn)消息的輪詢消費
int index = consumerSeq.get() % consumerEnvList.size();
consumerSeq.getAndIncrement();
return consumerEnvList.get(index);
}
(2) 從隊列中取出消息
(3) 把消息帶入到回調(diào)方法,交給線程池進行執(zhí)行
/**
* 消費者進行消費信息
* @param queue
*/
private void consumeMessage(MSQueue queue) {
// 1. 按照輪詢的方式, 找個消費者出來.
ConsumerEnv luckyDog = queue.chooseConsumer();
if (luckyDog == null) {
// 當前隊列沒有消費者, 暫時不消費. 等后面有消費者出現(xiàn)再說.
return;
}
// 2. 從隊列中取出一個消息
Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
System.out.println(message);
if (message == null) {
// 當前隊列中還沒有消息, 也不需要消費.
return;
}
// 3. 把消息帶入到消費者的回調(diào)方法中, 丟給線程池執(zhí)行.
workPool.submit(() -> {
try {
// 1. 把消息放到待確認的集合中. 這個操作勢必在執(zhí)行回調(diào)之前.
parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);
// 2. 真正執(zhí)行回調(diào)操作
luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),
message.getBody());
// 3. 如果當前是 "自動應答" , 就可以直接把消息刪除了.
// 如果當前是 "手動應答" , 則先不處理, 交給后續(xù)消費者調(diào)用 basicAck 方法來處理.
if (luckyDog.isAutoAck()) {
// 此時是自動應答,表示直接刪除
// 1) 刪除硬盤上的消息
if (message.getDeliverMode() == 2) {
parent.getDiskDataCenter().deleteMessage(queue, message);
}
// 2) 刪除上面的待確認集合中的消息
parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageID());
// 3) 刪除內(nèi)存中消息中心里的消息
parent.getMemoryDataCenter().removeMessage(message.getMessageID());
System.out.println("[ConsumerManager] 消息被成功消費! queueName=" + queue.getName());
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
3.3 訂閱消息小結(jié)
?4. 消息確認?basicAck()
?此處是消費者在回調(diào)函數(shù)中對消息進行處理之后再回調(diào)函數(shù)中執(zhí)行的.
- 1. 獲取要刪除消息以及所在隊列的對象
- 2. 刪除硬盤和內(nèi)存的數(shù)據(jù)
- 3. 刪除未確認消息集合的數(shù)據(jù)
/**
* 消費者消費完消息進行手動應答
* @return
*/
public boolean basicAck(String queueName, String messageId){
queueName = virtualHostName + queueName;
try {
// 1. 獲取要刪除消息以及所在隊列的對象
Message message = memoryDataCenter.getMessage(messageId);
if (message == null){
throw new MqException("[VirtualHost] 確認的信息不存在 messageId="+messageId);
}
MSQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null){
throw new MqException("[VirtualHost] 確認的隊列不存在 queueName="+queueName);
}
// 2
// 1.)刪除硬盤中的數(shù)據(jù)
if(message.getDeliverMode() == 2){
diskDataCenter.deleteMessage(queue,message);
}
// 2.) 刪除消息中心的消息
memoryDataCenter.removeMessage(message.getMessageID());
// 3.) 刪除委未確認消息集合的消息
memoryDataCenter.removeMessageWaitAck(queue.getName(),message.getMessageID());
System.out.println("[VirtualHost] basicAck成功 消息被確認成功 queueName=" + queueName
+ ",messageId:." + messageId);
return true;
} catch (MqException | ClassNotFoundException | IOException e) {
e.printStackTrace();
System.out.println("[VirtualHost] basicAck失敗 消息被確認失敗 queueName=" + queueName
+ ",messageId:." + messageId);
return false;
}
}
至此以上就是VirtualHost的全部內(nèi)容,內(nèi)容很多,很繁瑣需要,靜下心來仔細的體會.
5. VirtualHost單元測試
?
package com.example.demo.mqserver;
import ch.qos.logback.core.util.FileUtil;
import com.example.demo.DemoApplication;
import com.example.demo.common.Consumer;
import com.example.demo.mqserver.core.BasicProperties;
import com.example.demo.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.*;
/**
* Created with IntelliJ IDEA.
* Description:虛擬主機的操作測試
* User: YAO
* Date: 2023-08-01
* Time: 18:26
*/
class VirtualHostTest {@Autowired
public VirtualHost virtualHost = null;
@BeforeEach
void setUp() {
DemoApplication.context = SpringApplication.run(DemoApplication.class);
// 創(chuàng)建好虛擬主機對象
virtualHost = new VirtualHost("default");
}
@AfterEach
void tearDown() throws IOException {
DemoApplication.context.close();
//把硬盤的目錄進行刪除
File dataDir = new File("./data");
FileUtils.deleteDirectory(dataDir);
}
@Test
void exchangeDeclare() {
boolean ok = virtualHost.exchangeDeclare("testExchange",
ExchangeType.DIRECT,true,false,null);
Assertions.assertTrue(ok);
}
@Test
void exchangeDelete() {
boolean ok = virtualHost.exchangeDeclare("testExchange",
ExchangeType.DIRECT,true,false,null);
ok = virtualHost.exchangeDelete("testExchange");
Assertions.assertTrue(ok);
}
@Test
void queueDeclare() {
boolean ok = virtualHost.queueDeclare("testQueue",
true,false,false,null);
Assertions.assertTrue(ok);
}
@Test
void queueDelete() {
boolean ok = virtualHost.queueDeclare("testQueue",
true,false,false,null);
ok = virtualHost.queueDelete("testQueue");
Assertions.assertTrue(ok);
}
@Test
void queueBind() {
boolean ok = virtualHost.exchangeDeclare("testExchange",
ExchangeType.DIRECT,true,false,null);
ok = virtualHost.queueDeclare("testQueue",
true,false,false,null);
ok = virtualHost.queueBind("testQueue","testExchange",
"testBindingKey");
Assertions.assertTrue(ok);
}
@Test
void queueUnbind() {
boolean ok = virtualHost.exchangeDeclare("testExchange",
ExchangeType.DIRECT,true,false,null);
ok = virtualHost.queueDeclare("testQueue",
true,false,false,null);
ok = virtualHost.queueBind("testQueue","testExchange",
"testBindingKey");
ok = virtualHost.queueUnbind("testQueue","testExchange");
Assertions.assertTrue(ok);
}
@Test
void basicPublish() {
boolean ok = virtualHost.exchangeDeclare("testExchange",
ExchangeType.DIRECT,true,false,null);
ok = virtualHost.queueDeclare("testQueue",
true,false,false,null);
ok = virtualHost.basicPublish("testExchange","testQueue",null,"Hello".getBytes(StandardCharsets.UTF_8));
Assertions.assertTrue(ok);
}
/**
* 1. 先訂閱, 后發(fā)布消息
*/
@Test
public void testBasicConsume1() throws InterruptedException {
boolean ok = virtualHost.queueDeclare("testQueue", true,
false, false, null);
Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
true, false, null);
Assertions.assertTrue(ok);
// 先訂閱隊列
ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
// 消費者自身設定的回調(diào)方法.
System.out.println("messageId=" + basicProperties.getMessageId());
System.out.println("body=" + new String(body, 0, body.length));
Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
Assertions.assertEquals(1, basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(), body);
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
// 再發(fā)送消息
ok = virtualHost.basicPublish("testExchange", "testQueue", null,
"hello".getBytes());
Assertions.assertTrue(ok);
}
/**
* 先發(fā)送消息, 后訂閱隊列.
*/
@Test
public void testBasicConsume2() throws InterruptedException {
boolean ok = virtualHost.queueDeclare("testQueue", true,
false, false, null);
Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
true, false, null);
Assertions.assertTrue(ok);
// 先發(fā)送消息
ok = virtualHost.basicPublish("testExchange", "testQueue", null,
"hello".getBytes());
Assertions.assertTrue(ok);
// 再訂閱隊列
ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
// 消費者自身設定的回調(diào)方法.
System.out.println("messageId=" + basicProperties.getMessageId());
System.out.println("body=" + new String(body, 0, body.length));
Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
Assertions.assertEquals(1, basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(), body);
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
}
@Test
public void testBasicConsumeFanout() throws InterruptedException {
// 創(chuàng)建一個交換機,并且綁定兩個隊列
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false, false, null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue1", false, false, false, null);
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testQueue1", "testExchange", "");
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue2", false, false, false, null);
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testQueue2", "testExchange", "");
Assertions.assertTrue(ok);
// 發(fā)布消息發(fā)到交換機
ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());
Assertions.assertTrue(ok);
Thread.sleep(500);
// 兩個消費者訂閱上述的兩個隊列.
ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
System.out.println("consumerTag=" + consumerTag);
System.out.println("messageId=" + basicProperties.getMessageId());
Assertions.assertArrayEquals("hello".getBytes(), body);
}
});
Assertions.assertTrue(ok);
ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
System.out.println("consumerTag=" + consumerTag);
System.out.println("messageId=" + basicProperties.getMessageId());
Assertions.assertArrayEquals("hello".getBytes(), body);
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
}
@Test
public void testBasicConsumeTopic() throws InterruptedException {
// 1. 創(chuàng)建交換機(主題交換機)
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false, false, null);
Assertions.assertTrue(ok);
// 2. 創(chuàng)建隊列
ok = virtualHost.queueDeclare("testQueue", false, false, false, null);
Assertions.assertTrue(ok);
// 3. 將交換機和隊列進行綁定(設置bindingKey)
ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");
Assertions.assertTrue(ok);
// 4. 發(fā)布消息(設置routingKey)
ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());
Assertions.assertTrue(ok);
// 5. 訂閱消息
ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
System.out.println("consumerTag=" + consumerTag);
System.out.println("messageId=" + basicProperties.getMessageId());
Assertions.assertArrayEquals("hello".getBytes(), body);
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
}
@Test
public void testBasicAck() throws InterruptedException {
boolean ok = virtualHost.queueDeclare("testQueue", true,
false, false, null);
Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
true, false, null);
Assertions.assertTrue(ok);
// 先發(fā)送消息
ok = virtualHost.basicPublish("testExchange", "testQueue", null,
"hello".getBytes());
Assertions.assertTrue(ok);
// 再訂閱隊列 [要改的地方, 把 autoAck 改成 false]
ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
// 消費者自身設定的回調(diào)方法.
System.out.println("messageId=" + basicProperties.getMessageId());
System.out.println("body=" + new String(body, 0, body.length));
Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
Assertions.assertEquals(1, basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(), body);
// [要改的地方, 新增手動調(diào)用 basicAck]
boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());
Assertions.assertTrue(ok);
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
}
}
結(jié)語
? ? ? ? 本文將整個VirtualHost進行了實現(xiàn),實現(xiàn)了供BrokerServer調(diào)用的API.基礎的消息隊列框架已經(jīng)搭建好了,接下來就是搭建服務器和客戶端了.請持續(xù)關(guān)注,謝謝!!!
完整的項目代碼已上傳Gitee,歡迎大家訪問.??????文章來源:http://www.zghlxwxcb.cn/news/detail-643139.html
模擬實現(xiàn)消息隊列https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq文章來源地址http://www.zghlxwxcb.cn/news/detail-643139.html
到了這里,關(guān)于模擬實現(xiàn)消息隊列項目(系列5) -- 服務器模塊(虛擬主機)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!