??前言
本次開發(fā)任務(wù)
- 實(shí)現(xiàn) BrokerServer 類,也就是咱們消息隊(duì)列的本體服務(wù)器。
其實(shí)本質(zhì)上就是一個(gè) TCP 的服務(wù)器。
??創(chuàng)建 BrokerServer 類
創(chuàng)建 BrokerServer 類如下:
public class BrokerServer {
// 當(dāng)前程序只考慮?個(gè)虛擬主機(jī)的情況.
private VirtualHost virtualHost = new VirtualHost("default-VirtualHost");
// key 為 channelId, value 為 channel 對(duì)應(yīng)的 socket 對(duì)象.
private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>
private ServerSocket serverSocket;
private ExecutorService executorService;
private volatile boolean runnable = true;
}
- virtualHost表示服務(wù)器持有的虛擬主機(jī).隊(duì)列,交換機(jī),綁定,消息都是通過虛擬主機(jī)管理.
- sessions ?來管理所有的客?端的連接. 記錄每個(gè)客戶端的 socket.
- serverSocket 是服務(wù)器自身的 socket
- executorService 這個(gè)線程池用來處理響應(yīng)
- runnable 這個(gè)標(biāo)志位用來控制服務(wù)器的運(yùn)行停?
??啟動(dòng)與停止服務(wù)器
代碼實(shí)現(xiàn)如下:文章來源:http://www.zghlxwxcb.cn/news/detail-854719.html
public BrokerServer(int port) throws IOException {
serverSocket = new ServerSocket(port);
}
public void start() throws IOException {
System.out.println("[BrokerServer] 啟動(dòng)!");
executorService = Executors.newCachedThreadPool();
try {
while (runnable) {
Socket clientSocket = serverSocket.accept();
// 把處理連接的邏輯丟給這個(gè)線程池.
executorService.submit(() -> {
processConnection(clientSocket);
});
}
} catch (SocketException e) {
System.out.println("[BrokerServer] 服務(wù)器停止運(yùn)行!");
// e.printStackTrace();
}
}
// 一般來說停止服務(wù)器, 就是直接 kill 掉對(duì)應(yīng)進(jìn)程就行了.
// 此處還是搞一個(gè)單獨(dú)的停止方法. 主要是用于后續(xù)的單元測(cè)試.
public void stop() throws IOException {
runnable = false;
// 把線程池中的任務(wù)都放棄了. 讓線程都銷毀.
executorService.shutdownNow();
serverSocket.close();
}
??實(shí)現(xiàn)處理連接
通過這個(gè)方法, 來處理一個(gè)客戶端的連接.
我們使用 InputStream
與 OutputStream
,由于后面要按照特定格式來讀取并解析.
此時(shí)就需要用到 DataInputStream
和 DataOutputStream
在這一個(gè)連接中, 可能會(huì)涉及到多個(gè)請(qǐng)求和響應(yīng),我們使用一個(gè)while(true)
來進(jìn)行實(shí)現(xiàn)
在此循環(huán)我們要做的事情有三件:
- 讀取請(qǐng)求并解析
- 根據(jù)請(qǐng)求計(jì)算響應(yīng)
- 把響應(yīng)寫回客戶端
具體處理邏輯,我們后面再仔細(xì)實(shí)現(xiàn),
那么我們?cè)趺唇Y(jié)束這個(gè)循環(huán)呢?
注意我們上面使用的是 DataInputStream
和 DataOutputStream
,當(dāng)沒有數(shù)據(jù)進(jìn)行讀取的時(shí)候,就會(huì)進(jìn)行拋出異常而結(jié)束循環(huán)
最后當(dāng)連接處理完了, 就需要記得關(guān)閉 socket, 一個(gè) TCP 連接中, 可能包含多個(gè) channel. 需要把當(dāng)前這個(gè) socket 對(duì)應(yīng)的所有 channel 也順便清理掉.
代碼實(shí)現(xiàn)如下:
// 通過這個(gè)方法, 來處理一個(gè)客戶端的連接.
// 在這一個(gè)連接中, 可能會(huì)涉及到多個(gè)請(qǐng)求和響應(yīng).
private void processConnection(Socket clientSocket) {
try (InputStream inputStream = clientSocket.getInputStream();
OutputStream outputStream = clientSocket.getOutputStream()) {
// 這里需要按照特定格式來讀取并解析. 此時(shí)就需要用到 DataInputStream 和 DataOutputStream
try (DataInputStream dataInputStream = new DataInputStream(inputStream);
DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
while (true) {
// 1. 讀取請(qǐng)求并解析.
Request request = readRequest(dataInputStream);
// 2. 根據(jù)請(qǐng)求計(jì)算響應(yīng)
Response response = process(request, clientSocket);
// 3. 把響應(yīng)寫回給客戶端
writeResponse(dataOutputStream, response);
}
}
} catch (EOFException | SocketException e) {
// 對(duì)于這個(gè)代碼, DataInputStream 如果讀到 EOF , 就會(huì)拋出一個(gè) EOFException 異常.
// 需要借助這個(gè)異常來結(jié)束循環(huán)
System.out.println("[BrokerServer] connection 關(guān)閉! 客戶端的地址: " + clientSocket.getInetAddress().toString()
+ ":" + clientSocket.getPort());
} catch (IOException | ClassNotFoundException | MqException e) {
System.out.println("[BrokerServer] connection 出現(xiàn)異常!");
e.printStackTrace();
} finally {
try {
// 當(dāng)連接處理完了, 就需要記得關(guān)閉 socket
clientSocket.close();
// 一個(gè) TCP 連接中, 可能包含多個(gè) channel. 需要把當(dāng)前這個(gè) socket 對(duì)應(yīng)的所有 channel 也順便清理掉.
clearClosedSession(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
}
}
??實(shí)現(xiàn) readRequest 與 writeResponse
關(guān)于讀取請(qǐng)求,我們前面定義了一個(gè)類為 Request ,此時(shí)我們構(gòu)造相應(yīng)的對(duì)象,并對(duì)該對(duì)象相應(yīng)屬性進(jìn)行填充即可。
代碼實(shí)現(xiàn)如下:
private Request readRequest(DataInputStream dataInputStream) throws IOException {
Request request = new Request();
request.setType(dataInputStream.readInt());
request.setLength(dataInputStream.readInt());
byte[] payload = new byte[request.getLength()];
int n = dataInputStream.read(payload);
if (n != request.getLength()) {
throw new IOException("讀取請(qǐng)求格式出錯(cuò)!");
}
request.setPayload(payload);
return request;
}
關(guān)于響應(yīng),實(shí)現(xiàn)相反,傳入的 響應(yīng)對(duì)象 相應(yīng)的屬性返回即可。
最后不要忘了刷新緩沖區(qū)
代碼實(shí)現(xiàn)如下:
private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
dataOutputStream.writeInt(response.getType());
dataOutputStream.writeInt(response.getLength());
dataOutputStream.write(response.getPayload());
// 這個(gè)刷新緩沖區(qū)也是重要的操作!!
dataOutputStream.flush();
}
??實(shí)現(xiàn)處理請(qǐng)求
先把請(qǐng)求轉(zhuǎn)換成 BaseArguments , 獲取到其中的 channelId 和 rid
再根據(jù)不同的 type, 分別處理不同的邏輯. (主要是調(diào)用virtualHost中不同的方法).
針對(duì)消息訂閱操作,則需要在存在消息的時(shí)候通過回調(diào),把響應(yīng)結(jié)果寫回給對(duì)應(yīng)的客?端.
最后構(gòu)造成統(tǒng)?的響應(yīng).
代碼實(shí)現(xiàn)如下:
private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
// 1. 把 request 中的 payload 做一個(gè)初步的解析.
BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()
+ ", type=" + request.getType() + ", length=" + request.getLength());
// 2. 根據(jù) type 的值, 來進(jìn)一步區(qū)分接下來這次請(qǐng)求要干啥.
boolean ok = true;
if (request.getType() == 0x1) {
// 創(chuàng)建 channel
sessions.put(basicArguments.getChannelId(), clientSocket);
System.out.println("[BrokerServer] 創(chuàng)建 channel 完成! channelId=" + basicArguments.getChannelId());
} else if (request.getType() == 0x2) {
// 銷毀 channel
sessions.remove(basicArguments.getChannelId());
System.out.println("[BrokerServer] 銷毀 channel 完成! channelId=" + basicArguments.getChannelId());
} else if (request.getType() == 0x3) {
// 創(chuàng)建交換機(jī). 此時(shí) payload 就是 ExchangeDeclareArguments 對(duì)象了.
ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),
arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());
} else if (request.getType() == 0x4) {
ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
ok = virtualHost.exchangeDelete(arguments.getExchangeName());
} else if (request.getType() == 0x5) {
QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),
arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());
} else if (request.getType() == 0x6) {
QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
ok = virtualHost.queueDelete((arguments.getQueueName()));
} else if (request.getType() == 0x7) {
QueueBindArguments arguments = (QueueBindArguments) basicArguments;
ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());
} else if (request.getType() == 0x8) {
QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;
ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());
} else if (request.getType() == 0x9) {
BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),
arguments.getBasicProperties(), arguments.getBody());
} else if (request.getType() == 0xa) {
BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),
new Consumer() {
// 這個(gè)回調(diào)函數(shù)要做的工作, 就是把服務(wù)器收到的消息可以直接推送回對(duì)應(yīng)的消費(fèi)者客戶端
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
// 先知道當(dāng)前這個(gè)收到的消息, 要發(fā)給哪個(gè)客戶端.
// 此處 consumerTag 其實(shí)是 channelId. 根據(jù) channelId 去 sessions 中查詢, 就可以得到對(duì)應(yīng)的
// socket 對(duì)象了, 從而可以往里面發(fā)送數(shù)據(jù)了
// 1. 根據(jù) channelId 找到 socket 對(duì)象
Socket clientSocket = sessions.get(consumerTag);
if (clientSocket == null || clientSocket.isClosed()) {
throw new MqException("[BrokerServer] 訂閱消息的客戶端已經(jīng)關(guān)閉!");
}
// 2. 構(gòu)造響應(yīng)數(shù)據(jù)
SubScribeReturns subScribeReturns = new SubScribeReturns();
subScribeReturns.setChannelId(consumerTag);
subScribeReturns.setRid(""); // 由于這里只有響應(yīng), 沒有請(qǐng)求, 不需要去對(duì)應(yīng). rid 暫時(shí)不需要.
subScribeReturns.setOk(true);
subScribeReturns.setConsumerTag(consumerTag);
subScribeReturns.setBasicProperties(basicProperties);
subScribeReturns.setBody(body);
byte[] payload = BinaryTool.toBytes(subScribeReturns);
Response response = new Response();
// 0xc 表示服務(wù)器給消費(fèi)者客戶端推送的消息數(shù)據(jù).
response.setType(0xc);
// response 的 payload 就是一個(gè) SubScribeReturns
response.setLength(payload.length);
response.setPayload(payload);
// 3. 把數(shù)據(jù)寫回給客戶端.
// 注意! 此處的 dataOutputStream 這個(gè)對(duì)象不能 close !!!
// 如果 把 dataOutputStream 關(guān)閉, 就會(huì)直接把 clientSocket 里的 outputStream 也關(guān)了.
// 此時(shí)就無法繼續(xù)往 socket 中寫入后續(xù)數(shù)據(jù)了.
DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
writeResponse(dataOutputStream, response);
}
});
} else if (request.getType() == 0xb) {
// 調(diào)用 basicAck 確認(rèn)消息.
BasicAckArguments arguments = (BasicAckArguments) basicArguments;
ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());
} else {
// 當(dāng)前的 type 是非法的.
throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());
}
// 3. 構(gòu)造響應(yīng)
BasicReturns basicReturns = new BasicReturns();
basicReturns.setChannelId(basicArguments.getChannelId());
basicReturns.setRid(basicArguments.getRid());
basicReturns.setOk(ok);
byte[] payload = BinaryTool.toBytes(basicReturns);
Response response = new Response();
response.setType(request.getType());
response.setLength(payload.length);
response.setPayload(payload);
System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()
+ ", type=" + response.getType() + ", length=" + response.getLength());
return response;
}
??實(shí)現(xiàn) clearClosedSession
這里要做的事情, 主要就是遍歷上述 sessions hash 表, 把該被關(guān)閉的 socket 對(duì)應(yīng)的鍵值對(duì), 統(tǒng)統(tǒng)刪掉
需要注意的是:
- 我們?cè)谶M(jìn)行迭代的時(shí)候,不要直接刪除,這樣會(huì)影響集合類的結(jié)構(gòu)
代碼實(shí)現(xiàn)如下:
private void clearClosedSession(Socket clientSocket) {
// 這里要做的事情, 主要就是遍歷上述 sessions hash 表, 把該被關(guān)閉的 socket 對(duì)應(yīng)的鍵值對(duì), 統(tǒng)統(tǒng)刪掉.
List<String> toDeleteChannelId = new ArrayList<>();
for (Map.Entry<String, Socket> entry : sessions.entrySet()) {
if (entry.getValue() == clientSocket) {
// 不能在這里直接刪除!!!
// 這屬于使用集合類的一個(gè)大忌!!! 一邊遍歷, 一邊刪除!!!
// sessions.remove(entry.getKey());
toDeleteChannelId.add(entry.getKey());
}
}
for (String channelId : toDeleteChannelId) {
sessions.remove(channelId);
}
System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);
}
?總結(jié)
關(guān)于《【消息隊(duì)列開發(fā)】 實(shí)現(xiàn)BrokerServer類——本體服務(wù)器》就講解到這兒,感謝大家的支持,歡迎各位留言交流以及批評(píng)指正,如果文章對(duì)您有幫助或者覺得作者寫的還不錯(cuò)可以點(diǎn)一下關(guān)注,點(diǎn)贊,收藏支持一下文章來源地址http://www.zghlxwxcb.cn/news/detail-854719.html
到了這里,關(guān)于【消息隊(duì)列開發(fā)】 實(shí)現(xiàn)BrokerServer類——本體服務(wù)器的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!