前面已經(jīng)講過了Java-NIO中的三大核心組件Selector、Channel、Buffer,現(xiàn)在組件我們回了,但是如何實(shí)現(xiàn)一個(gè)超級高并發(fā)的socket網(wǎng)絡(luò)通信程序呢?假設(shè),我們只有一臺內(nèi)存為32G的Intel-i710八核的機(jī)器,如何實(shí)現(xiàn)同時(shí)2萬個(gè)客戶端高并發(fā)非阻塞通信?可能你會說不可能實(shí)現(xiàn),答案是2萬的并發(fā)可能都低估了,Redis單機(jī)通信20萬的并發(fā)都是可以的,當(dāng)然達(dá)到20萬的并發(fā)對機(jī)器性能以及帶寬都需要非常高的要求。那么就不得不引出今天講解的Reactor反應(yīng)器模式,它可以說是一種高并發(fā)網(wǎng)絡(luò)編程中的設(shè)計(jì)模式,不包括在我們常說的23中設(shè)計(jì)模式之中。Netty網(wǎng)絡(luò)框架、Nginx服務(wù)器、Reids緩存等大名鼎鼎的中間件都是基于Reactor反應(yīng)器模式設(shè)計(jì)的,它就能提供超高并發(fā)的網(wǎng)絡(luò)通信,我學(xué)過之后一直感嘆這些大佬都是奇才,學(xué)這些思想精彩萬分!下面具體進(jìn)行介紹:
Reactor是什么?
Reactor就是一種網(wǎng)絡(luò)編程的設(shè)計(jì)模式,如果不知道Reactor那么學(xué)Netty的時(shí)候會非常困難,因?yàn)楹芏喔拍罹褪荝eactor,因此學(xué)會了Reactor在學(xué)Netty就非常簡單。其次,懂得高并發(fā)中間件的網(wǎng)絡(luò)通信設(shè)計(jì)的底層原理對提升自己的技術(shù)也是非常重要的,所以,學(xué)習(xí)像Netty這樣的“精品中的精品”框架,肯定也是需要先從設(shè)計(jì)模式入手的。而Netty的整體架構(gòu),就是基于這個(gè)著名反應(yīng)器模式。所以,學(xué)習(xí)和掌握反應(yīng)器模式,對于開始學(xué)習(xí)高并發(fā)通信(包括Netty框架)的人來說,一定是磨刀不誤砍柴工,況且很多中間件都是基于Netty來設(shè)計(jì)網(wǎng)絡(luò)通信模塊的。
思維風(fēng)暴開啟Reactor之路
好的,我們用一個(gè)例子開始講解Reactor原理,假設(shè)你是Doug Lea
,Java JUC包的作者, 也是Reactor設(shè)計(jì)模式的提出者之一。現(xiàn)在面臨的一個(gè)問題就是現(xiàn)在的軟件系統(tǒng)不能夠滿足日益增長的并發(fā)量,很多軟件系統(tǒng)一旦人訪問數(shù)多了要么卡死要么阻塞一段時(shí)間才有響應(yīng),用戶體驗(yàn)非常差,現(xiàn)在公司提出了這個(gè)需求需要你解決。請你思考:
單線程阻塞模式
首先TCP網(wǎng)絡(luò)通信需要先建立連接(三次握手)然后才可以傳輸數(shù)據(jù),于是你寫下了第一行解決的代碼:
1 while(true){
2 socket = accept(); //阻塞,接收連接
3 handle(socket) ; //讀取數(shù)據(jù)、業(yè)務(wù)處理、寫入結(jié)果
4 }
5 private void handle(socket){
6 String msg = socket.read(); //阻塞,讀取客戶端發(fā)送過來的數(shù)據(jù)
7 System.out.println(msg);
8 .... // 其他處理
9 }
解釋一下,上面采用一個(gè)循環(huán)的方式來解決這個(gè)問題,程序占用一個(gè)主線程不斷執(zhí)行while循環(huán)中的代碼,當(dāng)代碼執(zhí)行到第2行時(shí)如果沒有客戶端發(fā)生連接的請求則阻塞,不繼續(xù)向下執(zhí)行。直到某個(gè)客戶端發(fā)生連接請求,于是獲得了socket對象,這個(gè)對象假設(shè)包括客戶端的ip地址和端口號,并且可以通過socket與客戶端接受和發(fā)送數(shù)據(jù)。之后執(zhí)行到第6行代碼,這里也會阻塞直到用戶發(fā)生了數(shù)據(jù)。上面的服務(wù)器代碼如果只有一個(gè)客戶端與它交互是沒有問題的,如果超過一個(gè)用戶與之交互則會發(fā)生阻塞的情況,假設(shè)有兩個(gè)客戶A和B,A已經(jīng)連接好了服務(wù)器也就是上面代碼執(zhí)行到了第6行代碼進(jìn)行阻塞,此時(shí)服務(wù)器希望收到客戶發(fā)送的數(shù)據(jù)。就在阻塞的這個(gè)時(shí)候,如果B想要連接服務(wù)器,發(fā)送了連接請求,但是服務(wù)器代碼一直卡在第6行等待獲取客戶端的發(fā)生數(shù)據(jù),如果A一直不發(fā)送數(shù)據(jù)則B永遠(yuǎn)連不上服務(wù)器。除非等到A發(fā)送了一個(gè)數(shù)據(jù),于是程序運(yùn)行到第2行,然后接受B的連接請求,然后又卡在了第6行。很明顯,上面的網(wǎng)絡(luò)編程服務(wù)程序很糟糕,非???,連得上連不上完全看運(yùn)氣。失?。?/p>
這個(gè)時(shí)候,Doug Lea
進(jìn)行思考,阻塞是因?yàn)榫W(wǎng)絡(luò)編程就是基于事件觸發(fā)的,也就是說接受連接的第二行代碼和讀取數(shù)據(jù)的第六行代碼完全取決于客戶端,什么時(shí)候觸發(fā)完全隨機(jī),因此很難搞。另外一個(gè)最主要的原因是這個(gè)是單線程程序,那么使用多線程能不能解決呢?答案是基本上可以解決,而且早期的Tomcat服務(wù)器就是這樣設(shè)計(jì)的,這個(gè)模式就叫做 Connection Per Thread
模式。下面進(jìn)行詳細(xì)介紹!
多線程經(jīng)典Connection Per Thread模式
Connection Per Thread
即一個(gè)連接創(chuàng)建一個(gè)線程來處理,首先我們分析一下一臺上述的內(nèi)存32G的機(jī)器可以創(chuàng)建多少個(gè)線程,Java虛擬機(jī)默認(rèn)一個(gè)線程占用1MB的棧內(nèi)存,在不考慮其他情況下,假設(shè)分配給了虛擬機(jī)棧20G的空間,那么可以創(chuàng)建20*1024個(gè)線程來應(yīng)對網(wǎng)絡(luò)連接,也就是可以同時(shí)并發(fā)20480個(gè)客戶端的請求。我們先看如何實(shí)現(xiàn),再看它的缺點(diǎn)是什么,實(shí)現(xiàn)代碼如下:
public class ConnectionPerThread implements Runnable {
@Override
public void run(){
Socket socket = new Socket();
while(true){
acceptedSocket = socket.accept(); //依舊是阻塞方法,接受客戶端的連接請求
// 如果有一個(gè)連接就立即創(chuàng)建一個(gè)線程為這個(gè)連接服務(wù),直到連接斷開
Handler handler = new Handler(socket);
new Thread(handler).start(); // 啟動新線程執(zhí)行run方法
}
}
class Handler implements Runnable{
Socket socket;
public Handler(Socket socket){
this.socket = socket;
}
@Override
public void run() {
while (true){
String msg = socket.read(); //依舊是阻塞方法,接受客戶端的發(fā)送的數(shù)據(jù)
if("close".equals(msg)){ // 假設(shè)客戶端主動斷開發(fā)送`close`字符,NIO中是空字符串表示斷開
break; // 終止線程
}
// 也可以執(zhí)行寫操作,如果是發(fā)送大數(shù)據(jù)會明顯阻塞,如果小文件可視為非阻塞,本質(zhì)還是會阻塞
socket.write("hello 用戶!");
}
}
}
}
以上的Socket使用的是偽代碼,實(shí)際上需要使用OIO或者NIO的ServerSocket對象,反正能夠表達(dá)這個(gè)意思就行。其實(shí)上面的代碼還可以使用線程池來維護(hù)線程進(jìn)行優(yōu)化,但是這里只是為了舉例說明多線程也是可以的實(shí)現(xiàn)較高并發(fā)的網(wǎng)絡(luò)通信。下面來具體分析:
以上示例代碼中,對于每一個(gè)新的網(wǎng)絡(luò)連接都分配給一個(gè)線程。每個(gè)線程都獨(dú)自處理自己負(fù)責(zé)的socket連接的輸入和輸出。當(dāng)然,服務(wù)器的監(jiān)聽線程也是獨(dú)立的,任何的socket連接的輸入和輸出處理,不會阻塞到后面新socket連接的監(jiān)聽和建立,這樣,服務(wù)器的吞吐量就得到了提升。早期版本的Tomcat服務(wù)器,就是這樣實(shí)現(xiàn)的。Connection Per Thread模式(一個(gè)線程處理一個(gè)連接)的優(yōu)點(diǎn)是:解決了前面的新連接被嚴(yán)重阻塞的問題,在一定程度上,較大的提高了服務(wù)器的吞吐量。Connection Per Thread模式的缺點(diǎn)是:對應(yīng)于大量的連接,需要耗費(fèi)大量的線程資源,對線程資源要求太高。在系統(tǒng)中,線程是比較昂貴的系統(tǒng)資源。如果線程的數(shù)量太多,系統(tǒng)無法承受。而且,線程的反復(fù)創(chuàng)建、銷毀、線程的切換也需要代價(jià)。因此,在高并發(fā)的應(yīng)用場景下,多線程OIO的缺陷是致命的。新的問題來了:如何減少線程數(shù)量,比如說讓一個(gè)線程同時(shí)負(fù)責(zé)處理多個(gè)socket連接的輸入和輸出,行不行呢? 可以的,一個(gè)有效途徑是:使用Reactor反應(yīng)器模式。用反應(yīng)器模式對線程的數(shù)量進(jìn)行控制,做到一個(gè)線程處理大量的連接。它是如何做到呢?直接上正餐——多線程的Reactor反應(yīng)器模式。
多線程Reactor反應(yīng)器模式
喚醒你的回憶,還記得Selector和IO多路復(fù)用不?不記得的話請?jiān)L問:https://blog.csdn.net/cj151525/article/details/135695467 查看!我們前面講到,客戶端的連接和發(fā)送數(shù)據(jù)等行為是以IO事件的方式觸發(fā)Selector的查詢的,僅僅使用一個(gè)線程的Selector模式,就可以應(yīng)付大量的訪問,其主旨就是:如果某個(gè)用戶阻塞了那本線程就去為別的需要服務(wù)的用戶服務(wù),而不是傻傻等待你阻塞解除,總而言之就是線程只為通過Selector.select()
查詢出來的需要執(zhí)行的事件服務(wù)。因此,單線程下效率就非常高,例如Redis的數(shù)據(jù)處理模塊就是單線程的,單線程的優(yōu)點(diǎn)就是線程安全,CPU不需要頻繁上下文切換。這種模式下,并發(fā)量上10萬都是簡簡單單的。那么你敢想想如果我們引進(jìn)多線程將會有多高的并發(fā)量嗎?線程并不是越多越好,當(dāng)你的線程數(shù)量和你的CPU核心數(shù)相同時(shí)就不會頻繁發(fā)生CPU上下文切換,當(dāng)線程數(shù)遠(yuǎn)遠(yuǎn)超過CPU核心數(shù)才會頻繁發(fā)生導(dǎo)致執(zhí)行效率不高,甚至阻塞等問題。好的,目前基礎(chǔ)已經(jīng)講解完畢,下面正式引入Reactor反應(yīng)器模式。
引用一下Doug Lea大師在文章《Scalable IO in Java》中對反應(yīng)器模式的定義,具體如下:Reactor反應(yīng)器模式由Reactor反應(yīng)器線程、 Handlers處理器兩大角色組成,兩大角色的職責(zé)分別如下:
(1) Reactor反應(yīng)器線程的職責(zé):負(fù)責(zé)響應(yīng)IO事件,并且分發(fā)到Handlers處理器。
(2) Handlers處理器的職責(zé):非阻塞的執(zhí)行業(yè)務(wù)處理邏輯。
每一個(gè)單獨(dú)線程執(zhí)行的Selector我們就叫做Reactor反應(yīng)器。一個(gè)Reactor反應(yīng)器包括一個(gè)Selector對象,另外還有需要干的活兒,也就是run方法中需要執(zhí)行的邏輯,這個(gè)邏輯叫做Handler處理器。因此,如何理解Reactor反應(yīng)器,就是單獨(dú)線程來執(zhí)行的Selector。明白了這些之后,那么我們將Selector分為Boss和Worker,Boss只有一位負(fù)責(zé)用戶的連接請求與任務(wù)分發(fā),Worker可以有很多,負(fù)責(zé)發(fā)送和接受用戶的數(shù)據(jù)以及處理這些數(shù)據(jù)的中間過程。Boss和每個(gè)Worker就是一個(gè)Reactor,多線程Reactor反應(yīng)器模式的模型如下(黃色的是方法,橙色是對象):
下面是代碼實(shí)現(xiàn),注意為了和Netty中EventLoop
概念一致,這里Reactor
使用EventLoop
替代,你只要知道這兩的概念是同一個(gè),就是單獨(dú)線程執(zhí)行的Selector。代碼如下:
package com.cheney.nioBaseTest;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @version 1.0
* @Author Chenjie
* @Date 2024-01-21 18:39
* @注釋
*/
public class ReactorTest {
public static void main(String[] args) throws IOException {
new BossEventLoop().register();
}
/**
* BossReactor,EventLoop和Reactor是同一個(gè)概念
*/
@Slf4j
static class BossEventLoop implements Runnable {
private Selector bossSelector;
private WorkerEventLoop[] workers; // 一個(gè)boss負(fù)責(zé)分配任務(wù),worker負(fù)責(zé)執(zhí)行任務(wù)
private volatile boolean start = false; // 對象的方法只能執(zhí)行一次
AtomicInteger index = new AtomicInteger(); // WorkerEventLoop[]數(shù)組的下標(biāo)
public void register() throws IOException {
if (!start) {
// 連接Channel
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
bossSelector = Selector.open();
// Boss 注冊連接事件
SelectionKey ssckey = ssc.register(bossSelector, 0, null);
ssckey.interestOps(SelectionKey.OP_ACCEPT);
// 創(chuàng)建若干個(gè)WorkerReactor來讀取發(fā)送數(shù)據(jù)
workers = initEventLoops();
// 本Boss一個(gè)線程啟動起來先
new Thread(this, "boss").start();
log.debug("boss start...");
start = true;
}
}
/**
* 創(chuàng)建若干個(gè)WorkerEventLoop
* @return
*/
public WorkerEventLoop[] initEventLoops() {
// EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];
WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];
for (int i = 0; i < workerEventLoops.length; i++) {
workerEventLoops[i] = new WorkerEventLoop(i);
}
return workerEventLoops;
}
/**
* Boss需要執(zhí)行連接和任務(wù)分發(fā),就是概念中的Handler處理器,圖中的AcceptorHandler
*/
@Override
public void run() {
while (true) {
try {
bossSelector.select();
Iterator<SelectionKey> iter = bossSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
// 前面只注冊了連接事件,因此只要負(fù)責(zé)建立連接并將后續(xù)的任務(wù)分發(fā)給Worker就行
ServerSocketChannel c = (ServerSocketChannel) key.channel();
SocketChannel sc = c.accept();// 建立連接
sc.configureBlocking(false);
log.debug("{} connected", sc.getRemoteAddress());
// 分發(fā)給Worker來處理,這里是公平地輪詢,即每個(gè)Worker公平循環(huán)領(lǐng)取任務(wù)去執(zhí)行
// 因?yàn)槊總€(gè)Worker其實(shí)就是一個(gè)Selector,而每個(gè)Selector可以管理多個(gè)Channel(用戶交互)
workers[index.getAndIncrement() % workers.length].register(sc);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* WorkerReactor,主要負(fù)責(zé)讀取用戶發(fā)來的數(shù)據(jù)
*/
@Slf4j
static class WorkerEventLoop implements Runnable {
private Selector workerSelector;
private volatile boolean start = false;
private int index;
// 任務(wù)隊(duì)列,存放可執(zhí)行的命令,兩個(gè)線程需要傳參的話通過隊(duì)列來實(shí)現(xiàn)執(zhí)行邏輯解耦
private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();
public WorkerEventLoop(int index) {
this.index = index;
}
public void register(SocketChannel sc) throws IOException {
if (!start) {
workerSelector = Selector.open();
// 啟動一個(gè)新線程執(zhí)行本類的run方法
new Thread(this, "worker-" + index).start();
start = true;
}
tasks.add(() -> {
// 向任務(wù)隊(duì)列中添加任務(wù)(即需要執(zhí)行的指令)
try {
SelectionKey sckey = sc.register(workerSelector, 0, null);
sckey.interestOps(SelectionKey.OP_READ);
workerSelector.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
});
// 喚醒Selector
workerSelector.wakeup();
}
/**
* WorkerReactor 的Handler處理器,負(fù)責(zé)讀取用戶發(fā)過來的數(shù)據(jù)
*/
@Override
public void run() {
while (true) {
try {
workerSelector.select();
// 從任務(wù)隊(duì)列中獲取一個(gè)任務(wù)并執(zhí)行
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
Set<SelectionKey> keys = workerSelector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isReadable()) {
// 讀取客戶端發(fā)生過來的數(shù)據(jù)
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
try {
int read = sc.read(buffer);
if (read == -1) { // 如果-1則是用戶斷開連接觸發(fā)的讀事件
key.cancel();
sc.close();
} else {
buffer.flip();
log.debug("{} message:", sc.getRemoteAddress());
}
} catch (IOException e) {
e.printStackTrace();
key.cancel();
sc.close();
}
}
iter.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
總結(jié)
什么是Reactor?答:一個(gè)線程對應(yīng)一個(gè)Selector模式的對象,Reactor模式其中BossReactor負(fù)責(zé)客戶端連接與任務(wù)分發(fā)給WorkerReactor對象,WorkerReactor負(fù)責(zé)具體的數(shù)據(jù)發(fā)送與接受等操作。而各自所負(fù)責(zé)的任務(wù)也被叫做Handler(處理器)。相信看完上面的講解和代碼,你已經(jīng)知道了什么是Reactor模式了!文章來源:http://www.zghlxwxcb.cn/news/detail-814148.html
Reactor反應(yīng)器模式和優(yōu)點(diǎn)和缺點(diǎn)
反應(yīng)器模式和生產(chǎn)者消費(fèi)者模式有點(diǎn)相似,不過反應(yīng)器模式?jīng)]有生產(chǎn)者,而是通過Selector查詢已經(jīng)發(fā)生的事件從而委派給Worker進(jìn)行消費(fèi),可以認(rèn)為是只有消費(fèi)者的一種模式。反應(yīng)器模式和觀察者模式也有點(diǎn)相似,不同的是觀察者模式一旦發(fā)布者狀態(tài)變化時(shí),其他的所有觀察者都會收到通知從而執(zhí)行相應(yīng)的處理。而反應(yīng)器模式是一旦Selector查詢到了IO事件時(shí)只會指定某個(gè)Worker進(jìn)行處理而不是所有的Worker。文章來源地址http://www.zghlxwxcb.cn/news/detail-814148.html
優(yōu)點(diǎn)
- 響應(yīng)快,雖然同一反應(yīng)器線程本身是同步的,但不會被單個(gè)連接的IO操作所阻塞;
- 編程相對簡單,最大程度避免了復(fù)雜的多線程同步,也避免了多線程的各個(gè)進(jìn)程之間切換的開銷;
- 可擴(kuò)展,可以方便地通過增加反應(yīng)器線程的個(gè)數(shù)來充分利用CPU資源。
缺點(diǎn)
- 反應(yīng)器模式增加了一定的復(fù)雜性,因而有一定的門檻,并且不易于調(diào)試。
- 反應(yīng)器模式依賴于操作系統(tǒng)底層的IO多路復(fù)用系統(tǒng)調(diào)用的支持,如Linux中的epoll系統(tǒng)調(diào)用。如果操作系統(tǒng)的底層不支持IO多路復(fù)用,反應(yīng)器模式不會有那么高效。
- 同一個(gè)Handler業(yè)務(wù)線程中,如果出現(xiàn)一個(gè)長時(shí)間的數(shù)據(jù)讀寫,會影響這個(gè)反應(yīng)器中其他通道的IO處理。例如在大文件傳輸時(shí), IO操作就會影響其他客戶端(Client)的響應(yīng)時(shí)間。因而對于這種操作,還需要進(jìn)一步對反應(yīng)器模式進(jìn)行改進(jìn)。
到了這里,關(guān)于Java-NIO篇章(4)——Reactor反應(yīng)器模式的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!