国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

BIO、NIO和AIO

這篇具有很好參考價(jià)值的文章主要介紹了BIO、NIO和AIO。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

目錄

一.引言

何為IO

IO的過(guò)程

Java的3種網(wǎng)絡(luò)IO模型

阻塞和非阻塞IO

IO多路復(fù)用

異步和同步IO

二.BIO

三.NIO

1. 三大組件

Channel & Buffer

Selector

2.ByteBuffer

2.1ByteBuffer的使用

2.2ByteBuffer 結(jié)構(gòu)

?2.3ByteBuffer的常用方法

分配空間 ?

向 buffer 寫(xiě)入數(shù)據(jù)

從 buffer 讀取數(shù)據(jù)

字符串與 ByteBuffer 互轉(zhuǎn)

分散讀取、集中寫(xiě)入

2.4調(diào)試工具類(lèi)

3.文件編程

3.1FileChannel常用方法

獲取

讀取

寫(xiě)入

關(guān)閉

3.2兩個(gè)Channel之間傳輸數(shù)據(jù)

4.網(wǎng)絡(luò)編程

4.1ServerSocketChannel

4.2SocketChannel

4.3Selector

創(chuàng)建

綁定 Channel 事件

監(jiān)聽(tīng) Channel 事件

4.4處理 accept 事件

4.5處理 read 事件

消息邊界問(wèn)題

粘包問(wèn)題

半包問(wèn)題

如何正確處理TCP消息邊界?

client意外關(guān)閉或主動(dòng)關(guān)閉時(shí)

4.6處理 write 事件

一次無(wú)法保證把 buffer 中所有數(shù)據(jù)都寫(xiě)入 channel

4.7注意事項(xiàng)

?必須處理事件

水平觸發(fā)和邊緣觸發(fā)

事件處理完后需要在 selectedKeys 集合中移除對(duì)應(yīng)的SelectionKey

ByteBuffer 大小分配

select 何時(shí)不阻塞

5.多線程優(yōu)化

案例

如何拿到 cpu 個(gè)數(shù)

6.NIO vs BIO

stream vs channel

四.AIO


一.引言

何為IO

涉及計(jì)算機(jī)核心(CPU和內(nèi)存)與其他設(shè)備間數(shù)據(jù)遷移的過(guò)程,就是I/O。數(shù)據(jù)輸入到計(jì)算機(jī)內(nèi)存的過(guò)程即輸入,反之輸出到外部存儲(chǔ)(比如數(shù)據(jù)庫(kù),文件,遠(yuǎn)程主機(jī))的過(guò)程即輸出。?I/O 描述了計(jì)算機(jī)系統(tǒng)與外部設(shè)備之間通信的過(guò)程。

  • 磁盤(pán)I/O
    • 輸入:就是從磁盤(pán)讀取數(shù)據(jù)到內(nèi)存
    • 輸出:將內(nèi)存中的數(shù)據(jù)寫(xiě)入磁盤(pán)
  • 網(wǎng)絡(luò)I/O
    • 輸入:從網(wǎng)絡(luò)中的另一臺(tái)計(jì)算機(jī)或服務(wù)器獲取數(shù)據(jù),并將其加載到本地內(nèi)存中
    • 輸出:將本地內(nèi)存中的數(shù)據(jù)發(fā)送到網(wǎng)絡(luò)中的其他計(jì)算機(jī)或服務(wù)器

IO的過(guò)程

根據(jù)大學(xué)里學(xué)到的操作系統(tǒng)相關(guān)的知識(shí):為了保證操作系統(tǒng)的穩(wěn)定性和安全性,一個(gè)進(jìn)程的地址空間劃分為 用戶空間(User space)內(nèi)核空間(Kernel space ) 。

像我們平常運(yùn)行的應(yīng)用程序都是運(yùn)行在用戶空間,只有內(nèi)核空間才能進(jìn)行系統(tǒng)態(tài)級(jí)別的資源有關(guān)的操作,比如文件管理、進(jìn)程通信、內(nèi)存管理等等,因?yàn)檫@些都是比較危險(xiǎn)的操作,不可以由應(yīng)用程序亂來(lái),只能交給底層操作系統(tǒng)來(lái)。也就是說(shuō),我們想要進(jìn)行 IO 操作,只能發(fā)起系統(tǒng)調(diào)用請(qǐng)求操作系統(tǒng)來(lái)間接訪問(wèn)內(nèi)核空間

我們?cè)谄匠i_(kāi)發(fā)過(guò)程中接觸最多的就是 磁盤(pán) IO(讀寫(xiě)文件)網(wǎng)絡(luò) IO(網(wǎng)絡(luò)請(qǐng)求和響應(yīng))。

應(yīng)用程序的視角來(lái)看的話,我們的應(yīng)用程序?qū)Σ僮飨到y(tǒng)的內(nèi)核發(fā)起 IO 調(diào)用(系統(tǒng)調(diào)用),操作系統(tǒng)負(fù)責(zé)的內(nèi)核執(zhí)行具體的 IO 操作。也就是說(shuō),我們的應(yīng)用程序?qū)嶋H上只是發(fā)起了 IO 操作的調(diào)用而已,具體 IO 的執(zhí)行是由操作系統(tǒng)的內(nèi)核來(lái)完成的

當(dāng)應(yīng)用程序發(fā)起 I/O 調(diào)用后,會(huì)經(jīng)歷兩個(gè)步驟(IO執(zhí)行):

  1. 數(shù)據(jù)準(zhǔn)備:內(nèi)核等待 I/O 設(shè)備準(zhǔn)備好數(shù)據(jù),即操作系統(tǒng)將外部數(shù)據(jù)加載到內(nèi)核緩沖區(qū)
  2. 數(shù)據(jù)拷貝:內(nèi)核將數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶進(jìn)程緩沖區(qū)

BIO、NIO和AIO,IO,nio

Java的3種網(wǎng)絡(luò)IO模型

Java中提供的IO有關(guān)的API,也是依賴操作系統(tǒng)層面的IO操作實(shí)現(xiàn)的。在Java中,主要有三種IO模型,分別是阻塞IO(BIO)、非阻塞IO(NIO)和 異步IO(AIO)。

可以把Java中的BIO、NIO和AIO理解為是Java語(yǔ)言對(duì)操作系統(tǒng)的5種IO模型的封裝(在Linux(UNIX)操作系統(tǒng)中,共有五種IO模型,分別是:阻塞IO模型、非阻塞IO模型、IO復(fù)用模型、信號(hào)驅(qū)動(dòng)IO模型以及異步IO模型)。程序員在使用這些API的時(shí)候,不需要關(guān)心操作系統(tǒng)層面的知識(shí),也不需要根據(jù)不同操作系統(tǒng)編寫(xiě)不同的代碼。只需要使用Java的API就可以了。

阻塞和非阻塞IO

上面已經(jīng)說(shuō)過(guò),應(yīng)用程序的IO實(shí)際是分為兩個(gè)步驟,IO調(diào)用和IO執(zhí)行。IO調(diào)用是由進(jìn)程發(fā)起,IO執(zhí)行是操作系統(tǒng)的工作。操作系統(tǒng)的IO情況決定了進(jìn)程IO調(diào)用是否能夠得到立即響應(yīng)。

  • 阻塞IO:如果操作系統(tǒng)尚未準(zhǔn)備好數(shù)據(jù),當(dāng)前進(jìn)程或線程一直等待直到其就緒BIO、NIO和AIO,IO,nio
  • 非阻塞IO:如果操作系統(tǒng)尚未準(zhǔn)備好數(shù)據(jù),進(jìn)程或線程并不一直等待其就緒,而是可以做其他事情。進(jìn)程/線程會(huì)周期性地輪詢或查詢IO操作的狀態(tài),以確定數(shù)據(jù)是否就緒。(因?yàn)樾枰l繁地從用戶態(tài)切換到內(nèi)核態(tài),事實(shí)上并未比阻塞IO好多少)BIO、NIO和AIO,IO,nio

IO多路復(fù)用

IO多路復(fù)用模型,就是通過(guò)一種新的系統(tǒng)調(diào)用,一個(gè)進(jìn)程可以監(jiān)視多個(gè)文件描述符,一旦某個(gè)描述符就緒(一般是內(nèi)核緩沖區(qū)可讀/可寫(xiě)),內(nèi)核能夠通知程序進(jìn)行相應(yīng)的IO系統(tǒng)調(diào)用。

Java實(shí)現(xiàn)IO多路復(fù)用的基本原理:通過(guò)select/epoll系統(tǒng)調(diào)用,單個(gè)線程不斷地輪詢select/epoll系統(tǒng)調(diào)用所負(fù)責(zé)的成百上千的socket連接,當(dāng)某個(gè)或某些socket網(wǎng)絡(luò)有連接數(shù)據(jù)到達(dá)了,就返回這些可以讀寫(xiě)的連接。好處就顯而易見(jiàn)了,通過(guò)一個(gè)系統(tǒng)調(diào)用,就可以查詢到可以讀寫(xiě)的一個(gè)甚至多個(gè)網(wǎng)絡(luò)連接。

  • select 調(diào)用:內(nèi)核提供的系統(tǒng)調(diào)用,它支持一次查詢多個(gè)系統(tǒng)調(diào)用的可用狀態(tài)。幾乎所有的操作系統(tǒng)都支持。
  • epoll 調(diào)用:屬于 select 調(diào)用的增強(qiáng)版本,優(yōu)化了 IO 的執(zhí)行效率

阻塞 IO vs 多路復(fù)用

BIO、NIO和AIO,IO,nio

BIO、NIO和AIO,IO,nio

異步和同步IO

  • 同步IO::線程自己去獲取結(jié)果(一個(gè)線程,發(fā)起和返回結(jié)果都是自己)。所以阻塞IO非阻塞IO包括IO多路復(fù)用都是同步的
  • 異步IO:線程自己不去獲取結(jié)果,而是由其它線程返回結(jié)果(至少兩個(gè)線程) 。異步模式下一定是非阻塞的,所以不存在異步阻塞。異步IO是指程序發(fā)起IO操作后,它可以繼續(xù)執(zhí)行其他任務(wù),而不必等待IO操作完成。當(dāng)IO操作完成后,程序會(huì)得到通知,可以處理已完成的IO操作。異步IO可以提高程序的并發(fā)性和響應(yīng)性,因?yàn)樗试S程序在等待IO的同時(shí)執(zhí)行其他任務(wù)。BIO、NIO和AIO,IO,nio

自己的理解:我感覺(jué)阻塞和非阻塞IO針對(duì)的是操作系統(tǒng)未準(zhǔn)備好數(shù)據(jù)時(shí)進(jìn)程的處理方式,是等待還是不等待。異步和同步IO針對(duì)的是IO操作未完成時(shí)(IO操作包括數(shù)據(jù)準(zhǔn)備和數(shù)據(jù)拷貝兩步驟)進(jìn)程的處理方式,是等待還是不等待。

二.BIO

  • Java BIO 就是傳統(tǒng)的 java io 編程,其相關(guān)的類(lèi)和接口在 java.io
  • BIO(blocking I/O) :同步阻塞 IO 模型?,即在讀寫(xiě)數(shù)據(jù)過(guò)程中會(huì)發(fā)生阻塞現(xiàn)象,直至有可供讀取的數(shù)據(jù)或者數(shù)據(jù)能夠?qū)懭搿?/li>
  • 服務(wù)器實(shí)現(xiàn)模式為 一個(gè)連接一個(gè)線程,即客戶端有連接請(qǐng)求時(shí)服務(wù)器端就需 要啟動(dòng)一個(gè)線程進(jìn)行處理,如果這個(gè)連接不做任何事情會(huì)造成不必要的線程開(kāi)銷(xiāo),可以通過(guò)線程池機(jī)制改善(實(shí)現(xiàn)多個(gè)客戶連接服務(wù)器)

BIO、NIO和AIO,IO,nio

映射到Linux操作系統(tǒng)中,這就是一種最簡(jiǎn)單的IO模型,即阻塞IO。 阻塞 I/O 是最簡(jiǎn)單的 I/O 模型,一般表現(xiàn)為進(jìn)程或線程等待某個(gè)條件,如果條件不滿足,則一直等下去。條件滿足,則進(jìn)行下一步操作。

BIO、NIO和AIO,IO,nioBIO客戶端、服務(wù)端通信實(shí)現(xiàn)??

Server 服務(wù)端

/**
    目標(biāo):實(shí)現(xiàn)服務(wù)端可以同時(shí)接收多個(gè)客戶端的Socket通信需求。
    思路:是服務(wù)端每接收到一個(gè)客戶端socket請(qǐng)求對(duì)象之后都交給一個(gè)獨(dú)立的線程來(lái)處理客戶端的數(shù)據(jù)交互需求。
 */
public class Server {
    public static void main(String[] args) {
        try {
            // 1、注冊(cè)端口
            ServerSocket ss = new ServerSocket(9999);
            // 2、定義一個(gè)死循環(huán),負(fù)責(zé)不斷的接收客戶端的Socket鏈接請(qǐng)求
            while(true){
                Socket socket = ss.accept();
                // 3、創(chuàng)建一個(gè)獨(dú)立的線程來(lái)處理與這個(gè)客戶端的socket通信需求。
                new ServerThreadReader(socket).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

ServerThreadReader 服務(wù)端與客戶端保持通信的線程

public class ServerThreadReader extends Thread {
    private Socket socket;
    public ServerThreadReader(Socket socket){
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            // 從socket對(duì)象中得到一個(gè)字節(jié)輸入流
            InputStream is = socket.getInputStream();
            // 使用緩沖字符輸入流包裝字節(jié)輸入流
            BufferedReader br = new BufferedReader(new InputStreamReader(is));
            String msg;
            while((msg = br.readLine())!=null){
                System.out.println(msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Client 客戶端

/**
    客戶端
 */
public class Client {
    public static void main(String[] args) {
        try {
            // 1、請(qǐng)求與服務(wù)端的Socket對(duì)象鏈接
            Socket socket = new Socket("127.0.0.1" , 9999);
            // 2、得到一個(gè)打印流
            PrintStream ps = new PrintStream(socket.getOutputStream());
            // 3、使用循環(huán)不斷的發(fā)送消息給服務(wù)端接收
            Scanner sc = new Scanner(System.in);
            while(true){
                System.out.print("請(qǐng)說(shuō):");
                String msg = sc.nextLine();
                ps.println(msg);
                ps.flush();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

三.NIO

Java NIO(non-blocking)是從Java 1.4版本開(kāi)始引入的一個(gè)新的IO API,NIO 相關(guān)類(lèi)都被放在 java.nio 包及子包下,并且對(duì)原 java.io 包中的很多類(lèi)進(jìn)行改寫(xiě),可以替代標(biāo)準(zhǔn)的Java IO API。NIO與原來(lái)的IO有同樣的作用和目的,但是使用的方式完全不同,NIO支持面向緩沖區(qū)的、基于通道的IO操作。NIO將以更加高效的方式進(jìn)行讀寫(xiě)操作。

Java NIO(non-blocking) 映射的不是操作系統(tǒng)五大IO模型中的NIO模型(采用輪詢的方式檢查IO狀態(tài)),而是另外的一種模型,叫做IO多路復(fù)用模型( IO multiplexing )。

Java 中的 NIO ,有一個(gè)非常重要的選擇器 ( Selector )?的概念,也可以被稱為?多路復(fù)用器。通過(guò)它,只需要一個(gè)線程便可以管理多個(gè)客戶端連接。當(dāng)客戶端數(shù)據(jù)到了之后,才會(huì)為其服務(wù)。

BIO、NIO和AIO,IO,nio

NIO 有三大核心部分:Channel( 通道) ,Buffer( 緩沖區(qū)), Selector( 選擇器)

1. 三大組件

Channel & Buffer

channel 有一點(diǎn)類(lèi)似于 流,它就是讀寫(xiě)數(shù)據(jù)的雙向通道,可以從 channel 將數(shù)據(jù)讀入 buffer,也可以將 buffer 的數(shù)據(jù)寫(xiě)入 channel,而之前的 流 要么是輸入,要么是輸出,channel 比 流 更為底層

BIO、NIO和AIO,IO,nio

常見(jiàn)的 Channel 有

  • FileChannel (文件):從文件中讀寫(xiě)數(shù)據(jù)。
  • DatagramChannel?(UDP):能通過(guò) UDP 讀寫(xiě)網(wǎng)絡(luò)中的數(shù)據(jù)。
  • SocketChannel(TCP Client):能通過(guò) TCP 讀寫(xiě)網(wǎng)絡(luò)中的數(shù)據(jù)。
  • ServerSocketChannel(TCP Server):可以監(jiān)聽(tīng)新進(jìn)來(lái)的 TCP 連接,像 Web 服務(wù)器那樣。對(duì)每一個(gè)新進(jìn)來(lái)的連接都會(huì)創(chuàng)建一個(gè) SocketChannel

buffer 則用來(lái)緩沖讀寫(xiě)數(shù)據(jù),常見(jiàn)的 buffer 有

  • ByteBuffer(用的最多
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

Selector

selector 的作用就是配合一個(gè)線程來(lái)管理多個(gè) channel,獲取這些 channel 上發(fā)生的事件,這些 channel 工作在非阻塞模式下,不會(huì)讓線程吊死在一個(gè) channel 上。適合連接數(shù)特別多,但流量低的場(chǎng)景(low traffic)

BIO、NIO和AIO,IO,nio

調(diào)用 selector 的 select() 會(huì)阻塞直到 channel 發(fā)生了讀寫(xiě)就緒事件,這些事件發(fā)生,select 方法就會(huì)返回這些事件交給 thread 來(lái)處理

2.ByteBuffer

2.1ByteBuffer的使用

  1. 向 buffer 寫(xiě)入數(shù)據(jù),例如調(diào)用 channel.read(buffer)
  2. 調(diào)用 flip() 切換至讀模式
  3. 從 buffer 讀取數(shù)據(jù),例如調(diào)用 buffer.get()
  4. 調(diào)用 compact() 或 clear() 切換至寫(xiě)模式,compact()會(huì)自動(dòng)壓縮未讀的,clear()則會(huì)直接清空
  5. 一次可能讀不完,重復(fù) 1~4 步驟讀,
@Slf4j
public class TestByteBuffer {
    public static void main(String[] args) {
        // FileChannel 獲得方式
        // 1. 輸入輸出流, 2. RandomAccessFile
        try (FileChannel channel = new RandomAccessFile("D:\\data.txt", "rw").getChannel()) {
            // 準(zhǔn)備緩沖區(qū),指定容量后不可更改
            ByteBuffer buffer = ByteBuffer.allocate(10);
            while(true) {
                // 從 channel 讀取數(shù)據(jù),向 buffer 寫(xiě)入
                int len = channel.read(buffer);
                log.debug("讀取到的字節(jié)數(shù) {}", len);
                if(len == -1) { // 沒(méi)有內(nèi)容了
                    break;
                }
                // 打印 buffer 的內(nèi)容
                buffer.flip(); // 切換至讀模式
                while(buffer.hasRemaining()) { // 是否還有剩余未讀數(shù)據(jù)
                    byte b = buffer.get();//get()會(huì)改變讀指針,但get(i)不會(huì),直接根據(jù)索引查找位置
                    log.debug("實(shí)際字節(jié) {}", (char) b);
                }
                buffer.clear(); // 切換為寫(xiě)模式
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

2.2ByteBuffer 結(jié)構(gòu)

ByteBuffer的結(jié)構(gòu)可以看成一個(gè)連續(xù)的數(shù)組,有以下重要屬性

  • capacity:容量
  • position:起始位置
  • limit:寫(xiě)入/讀取限制位置

一開(kāi)始

BIO、NIO和AIO,IO,nio

寫(xiě)模式下,position 是寫(xiě)入位置,limit 等于容量,下圖表示寫(xiě)入了 4 個(gè)字節(jié)后的狀態(tài) ?BIO、NIO和AIO,IO,nio

flip 動(dòng)作發(fā)生后,position 切換為讀取位置,limit 切換為讀取限制 ?BIO、NIO和AIO,IO,nio

讀取 4 個(gè)字節(jié)后,狀態(tài)

BIO、NIO和AIO,IO,nio

clear 動(dòng)作發(fā)生后,狀態(tài)

BIO、NIO和AIO,IO,nio

compact 方法,是把未讀完的部分向前壓縮,使position變成剩余未讀的字節(jié)數(shù),然后切換至寫(xiě)模式

BIO、NIO和AIO,IO,nio2.3ByteBuffer的常用方法

分配空間 ?

分配容量后就不可修改

ByteBuffer byteBuffer1 = ByteBuffer.allocate(容量);//class java.nio.HeapByteBuffer
ByteBuffer byteBuffer2 = ByteBuffer.allocateDirect(容量);//class java.nio.DirectByteBuffer

兩種方法返回的實(shí)現(xiàn)類(lèi)不同:

  • HeapByteBuffer:分配在 java 堆內(nèi)存,讀寫(xiě)效率較低,受到 GC(垃圾回收) 的影響
  • DirectByteBuffer:通過(guò)調(diào)用本地操作系統(tǒng)的內(nèi)存管理機(jī)制來(lái)分配堆外內(nèi)存,讀寫(xiě)效率高(不需要通過(guò)額外的復(fù)制操作將數(shù)據(jù)從堆內(nèi)存復(fù)制到物理內(nèi)存),不會(huì)受 GC 影響,但分配的效率低,并且如果釋放不完全會(huì)造成內(nèi)存泄漏
向 buffer 寫(xiě)入數(shù)據(jù)

有兩種辦法

  • 調(diào)用 channel 的 read 方法
  • 調(diào)用 buffer 自己的 put 方法
從 buffer 讀取數(shù)據(jù)

同樣有兩種辦法

  • 調(diào)用 channel 的 write 方法
  • 調(diào)用 buffer 自己的 get 方法

get 方法會(huì)讓 position 讀指針向后走,如果想重復(fù)讀取數(shù)據(jù)

  • 可以調(diào)用 rewind 方法將 position 重新置為 0
  • 或者調(diào)用 get(int i) 方法獲取索引 i 的內(nèi)容,它不會(huì)移動(dòng)讀指針
字符串與 ByteBuffer 互轉(zhuǎn)

兩種方法:

ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好");
ByteBuffer buffer2 = Charset.forName("utf-8").encode("你好");

debug(buffer1);
debug(buffer2);

CharBuffer buffer3 = StandardCharsets.UTF_8.decode(buffer1);
System.out.println(buffer3.getClass());
System.out.println(buffer3.toString());

Buffer 是非線程安全

分散讀取、集中寫(xiě)入

2.4調(diào)試工具類(lèi)

netty依賴

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.51.Final</version>
        </dependency>
import io.netty.util.internal.StringUtil;

import java.nio.ByteBuffer;

import static io.netty.util.internal.MathUtil.isOutOfBounds;
import static io.netty.util.internal.StringUtil.NEWLINE;


public class ByteBufferUtil {
    private static final char[] BYTE2CHAR = new char[256];
    private static final char[] HEXDUMP_TABLE = new char[256 * 4];
    private static final String[] HEXPADDING = new String[16];
    private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
    private static final String[] BYTE2HEX = new String[256];
    private static final String[] BYTEPADDING = new String[16];

    static {
        final char[] DIGITS = "0123456789abcdef".toCharArray();
        for (int i = 0; i < 256; i++) {
            HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
            HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
        }

        int i;

        // Generate the lookup table for hex dump paddings
        for (i = 0; i < HEXPADDING.length; i++) {
            int padding = HEXPADDING.length - i;
            StringBuilder buf = new StringBuilder(padding * 3);
            for (int j = 0; j < padding; j++) {
                buf.append("   ");
            }
            HEXPADDING[i] = buf.toString();
        }

        // Generate the lookup table for the start-offset header in each row (up to 64KiB).
        for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
            StringBuilder buf = new StringBuilder(12);
            buf.append(NEWLINE);
            buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
            buf.setCharAt(buf.length() - 9, '|');
            buf.append('|');
            HEXDUMP_ROWPREFIXES[i] = buf.toString();
        }

        // Generate the lookup table for byte-to-hex-dump conversion
        for (i = 0; i < BYTE2HEX.length; i++) {
            BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
        }

        // Generate the lookup table for byte dump paddings
        for (i = 0; i < BYTEPADDING.length; i++) {
            int padding = BYTEPADDING.length - i;
            StringBuilder buf = new StringBuilder(padding);
            for (int j = 0; j < padding; j++) {
                buf.append(' ');
            }
            BYTEPADDING[i] = buf.toString();
        }

        // Generate the lookup table for byte-to-char conversion
        for (i = 0; i < BYTE2CHAR.length; i++) {
            if (i <= 0x1f || i >= 0x7f) {
                BYTE2CHAR[i] = '.';
            } else {
                BYTE2CHAR[i] = (char) i;
            }
        }
    }

    /**
     * 打印所有內(nèi)容
     * @param buffer
     */
    public static void debugAll(ByteBuffer buffer) {
        int oldlimit = buffer.limit();
        buffer.limit(buffer.capacity());
        StringBuilder origin = new StringBuilder(256);
        appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
        System.out.println("+--------+-------------------- all ------------------------+----------------+");
        System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
        System.out.println(origin);
        buffer.limit(oldlimit);
    }

    /**
     * 打印可讀取內(nèi)容
     * @param buffer
     */
    public static void debugRead(ByteBuffer buffer) {
        StringBuilder builder = new StringBuilder(256);
        appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
        System.out.println("+--------+-------------------- read -----------------------+----------------+");
        System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
        System.out.println(builder);
    }

    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put(new byte[]{97, 98, 99, 100});
        debugAll(buffer);
    }

    private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
        if (isOutOfBounds(offset, length, buf.capacity())) {
            throw new IndexOutOfBoundsException(
                    "expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
                            + ") <= " + "buf.capacity(" + buf.capacity() + ')');
        }
        if (length == 0) {
            return;
        }
        dump.append(
                "         +-------------------------------------------------+" +
                        NEWLINE + "         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |" +
                        NEWLINE + "+--------+-------------------------------------------------+----------------+");

        final int startIndex = offset;
        final int fullRows = length >>> 4;
        final int remainder = length & 0xF;

        // Dump the rows which have 16 bytes.
        for (int row = 0; row < fullRows; row++) {
            int rowStartIndex = (row << 4) + startIndex;

            // Per-row prefix.
            appendHexDumpRowPrefix(dump, row, rowStartIndex);

            // Hex dump
            int rowEndIndex = rowStartIndex + 16;
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
            }
            dump.append(" |");

            // ASCII dump
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
            }
            dump.append('|');
        }

        // Dump the last row which has less than 16 bytes.
        if (remainder != 0) {
            int rowStartIndex = (fullRows << 4) + startIndex;
            appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

            // Hex dump
            int rowEndIndex = rowStartIndex + remainder;
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
            }
            dump.append(HEXPADDING[remainder]);
            dump.append(" |");

            // Ascii dump
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
            }
            dump.append(BYTEPADDING[remainder]);
            dump.append('|');
        }

        dump.append(NEWLINE +
                "+--------+-------------------------------------------------+----------------+");
    }

    private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
        if (row < HEXDUMP_ROWPREFIXES.length) {
            dump.append(HEXDUMP_ROWPREFIXES[row]);
        } else {
            dump.append(NEWLINE);
            dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
            dump.setCharAt(dump.length() - 9, '|');
            dump.append('|');
        }
    }

    public static short getUnsignedByte(ByteBuffer buffer, int index) {
        return (short) (buffer.get(index) & 0xFF);
    }

3.文件編程

3.1FileChannel常用方法

FileChannel 只能工作在阻塞模式下,其他與網(wǎng)絡(luò)有關(guān)的Channel則有阻塞模式與非阻塞模式兩種

獲取

不能直接打開(kāi) FileChannel,必須通過(guò) FileInputStream、FileOutputStream 或者RandomAccessFile 來(lái)獲取 FileChannel,它們都有 getChannel 方法

  • 通過(guò) FileInputStream 獲取的 channel 只能讀
  • 通過(guò) FileOutputStream 獲取的 channel 只能寫(xiě)
  • 通過(guò) RandomAccessFile 是否能讀寫(xiě)根據(jù)構(gòu)造 RandomAccessFile 時(shí)的讀寫(xiě)模式(rw)決定
讀取

會(huì)從 channel 讀取數(shù)據(jù)填充 ByteBuffer,返回值表示讀到了多少字節(jié),-1 表示到達(dá)了文件的末尾

int readBytes = channel.read(buffer);
寫(xiě)入
ByteBuffer buffer = ...;
buffer.put(...); // 存入數(shù)據(jù)
buffer.flip();   // 切換讀模式

while(buffer.hasRemaining()) {
    channel.write(buffer);
}

在 while 中調(diào)用 channel.write 是因?yàn)?write 方法并不能保證一次將 buffer 中的內(nèi)容全部寫(xiě)入 channel

關(guān)閉

channel 必須關(guān)閉,不過(guò)調(diào)用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法會(huì)間接地調(diào)用 channel 的 close 方法

3.2兩個(gè)Channel之間傳輸數(shù)據(jù)

超過(guò) 2g 大小的文件傳輸:

transferTo(起始位置,傳輸數(shù),傳輸目標(biāo)地)

public class TestFileChannelTransferTo {
    public static void main(String[] args) {
        try (
                FileChannel from = new FileInputStream("data.txt").getChannel();
                FileChannel to = new FileOutputStream("to.txt").getChannel();
        ) {
            // 效率高,底層會(huì)利用操作系統(tǒng)的零拷貝進(jìn)行優(yōu)化
            long size = from.size();
            // left 變量代表還剩余多少字節(jié)
            for (long left = size; left > 0; ) {
                System.out.println("position:" + (size - left) + " left:" + left);
                left -= from.transferTo((size - left), left, to);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

4.網(wǎng)絡(luò)編程

4.1ServerSocketChannel

Java NIO中的 ServerSocketChannel 是一個(gè)可以監(jiān)聽(tīng)新進(jìn)來(lái)的TCP連接的通道, 就像標(biāo)準(zhǔn)IO中的ServerSocket一樣。ServerSocketChannel類(lèi)在 java.nio.channels包中。

  • ServerSocketChannel
    • ServerSocketChannel是非阻塞的,這意味著它可以在沒(méi)有數(shù)據(jù)可用的情況下立即返回,而不必等待數(shù)據(jù)到達(dá)。
    • ServerSocketChannel通常與Selector一起使用,可以使用單個(gè)線程處理多個(gè)通道的I/O操作。
  • ServerSocket
    • ServerSocket是阻塞的,它會(huì)一直等待,直到有連接請(qǐng)求到達(dá)。
    • ServerSocket通常在一個(gè)獨(dú)立的線程中等待連接請(qǐng)求,并為每個(gè)連接創(chuàng)建一個(gè)新的線程進(jìn)行處理。
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(8080));

ServerSocketChannel可以設(shè)置成非阻塞模式(默認(rèn)為阻塞模式)。在非阻塞模式下,accept() 方法會(huì)立刻返回,如果還沒(méi)有新進(jìn)來(lái)的連接,返回的將是null。 因此,需要檢查返回的SocketChannel是否是null。如:??

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
while(true){
    SocketChannel socketChannel = serverSocketChannel.accept();
    if(socketChannel != null){
        //使用socketChannel做一些工作...
    }
}

4.2SocketChannel

Java NIO中的SocketChannel是一個(gè)連接到TCP網(wǎng)絡(luò)套接字的通道。SocketChannel和標(biāo)準(zhǔn)IO中的Socket是兩種不同的網(wǎng)絡(luò)編程方式。

  • SocketChannel
    • SocketChannel是非阻塞的,可以在沒(méi)有數(shù)據(jù)可用的情況下立即返回。
    • SocketChannel提供了讀取和寫(xiě)入緩沖區(qū)的方法,可以使用直接緩沖區(qū)(Direct Buffer)或者間接緩沖區(qū)(Heap Buffer)進(jìn)行數(shù)據(jù)傳輸。
    • SocketChannel通常與Selector一起使用,可以使用單個(gè)線程處理多個(gè)通道的I/O操作。
  • Socket
    • Socket是阻塞的,讀取和寫(xiě)入操作會(huì)一直等待,直到有數(shù)據(jù)可用或操作完成。
    • Socket使用InputStream和OutputStream進(jìn)行讀寫(xiě)操作。
    • Socket通常在一個(gè)獨(dú)立的線程中進(jìn)行讀寫(xiě)操作。

可以通過(guò)以下2種方式創(chuàng)建SocketChannel: ?

  • 打開(kāi)一個(gè)SocketChannel并連接到互聯(lián)網(wǎng)上的某臺(tái)服務(wù)器。
  • 一個(gè)新連接到達(dá)ServerSocketChannel時(shí),會(huì)創(chuàng)建一個(gè)SocketChannel。??
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));

4.3Selector

選擇器(Selector) 是 SelectableChannle 對(duì)象的多路復(fù)用器,Selector 可以同時(shí)監(jiān)控多個(gè) SelectableChannel 的 IO 狀況,也就是說(shuō),利用 Selector可使一個(gè)單獨(dú)的線程管理多個(gè) Channel。Selector 是非阻塞 IO 的核心

BIO、NIO和AIO,IO,nio

創(chuàng)建

創(chuàng)建 Selector :通過(guò)調(diào)用 Selector.open() 方法創(chuàng)建一個(gè) Selector。

Selector selector = Selector.open();
綁定 Channel 事件

也稱之為注冊(cè)事件,綁定的事件 selector 才會(huì)關(guān)心

channel.configureBlocking(false);//channel默認(rèn)為阻塞模式,需要手動(dòng)設(shè)置
SelectionKey key = channel.register(selector,綁定事件,附件);

注冊(cè)到selector上的channel 必須工作在非阻塞模式,F(xiàn)ileChannel 沒(méi)有非阻塞模式,因此不能配合 selector 一起使用

當(dāng)調(diào)用 register(Selector sel, int ops) 將通道注冊(cè)選擇器時(shí),選擇器對(duì)通道的監(jiān)聽(tīng)事件,需要通過(guò)第二個(gè)參數(shù) ops 指定。可以監(jiān)聽(tīng)的事件類(lèi)型(用 可使用 SelectionKey 的四個(gè)常量 表示):

  • 讀 : SelectionKey.OP_READ (1)
  • 寫(xiě) : SelectionKey.OP_WRITE (4)
  • 連接 : SelectionKey.OP_CONNECT (8)
  • 接收 : SelectionKey.OP_ACCEPT (16)

若注冊(cè)時(shí)不止監(jiān)聽(tīng)一個(gè)事件,則可以使用“位或”操作符連接。

int interestSet = SelectionKey.OP_READ|SelectionKey.OP_WRITE 

第三個(gè)參數(shù)為附件,一般為bytebuffer。服務(wù)器在讀寫(xiě) bytebuffer 時(shí)很可能一次讀取不完,需要多次讀取,所以每次讀寫(xiě)時(shí)需要做到數(shù)據(jù)共享。但我們又需要保證每個(gè)socketchannel都有自己的bytebuffer,則可以通過(guò)附件來(lái)指定。

監(jiān)聽(tīng) Channel 事件

方法1,阻塞直到綁定事件發(fā)生

int count = selector.select();

方法2,阻塞直到綁定事件發(fā)生,或是超時(shí)(時(shí)間單位為 ms)

int count = selector.select(long timeout);

方法3,不會(huì)阻塞,也就是不管有沒(méi)有事件,立刻返回,自己根據(jù)返回值檢查是否有事件

int count = selector.selectNow();

4.4處理 accept 事件

client

public class Client {
    public static void main(String[] args) {
        try (Socket socket = new Socket("localhost", 8080)) {
            System.out.println(socket);
            socket.getOutputStream().write("world".getBytes());
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Server

@Slf4j
public class Server{
    public static void main(String[] args) {
        try (ServerSocketChannel channel = ServerSocketChannel.open()) {
            channel.bind(new InetSocketAddress(8080));
            System.out.println(channel);
            Selector selector = Selector.open();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                int count = selector.select();
                log.debug("select count: {}", count);
                // 獲取所有事件
                Set<SelectionKey> keys = selector.selectedKeys();
                // 遍歷所有事件,逐一處理
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    // 判斷事件類(lèi)型
                    if (key.isAcceptable()) {
                        ServerSocketChannel c = (ServerSocketChannel) key.channel();
                        // 必須處理
                        SocketChannel sc = c.accept();
                        log.debug("{}", sc);
                    }
                    // 處理完畢,必須將事件移除
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

4.5處理 read 事件

?Server

@Slf4j
public class Server {
    private static void split(ByteBuffer source) {
        source.flip();
        for (int i = 0; i < source.limit(); i++) {
            // 找到一條完整消息
            if (source.get(i) == '\n') {
                int length = i + 1 - source.position();
                // 把這條完整消息存入新的 ByteBuffer
                ByteBuffer target = ByteBuffer.allocate(length);
                // 從 source 讀,向 target 寫(xiě)
                for (int j = 0; j < length; j++) {
                    target.put(source.get());
                }
                debugAll(target);
            }
        }
        source.compact(); // 0123456789abcdef  position 16 limit 16
    }

    public static void main(String[] args) throws IOException {
        // 1. 創(chuàng)建 selector, 管理多個(gè) channel
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.configureBlocking(false);
        // 2. 建立 selector 和 channel 的聯(lián)系(注冊(cè))
        // SelectionKey 就是將來(lái)事件發(fā)生后,通過(guò)它可以知道事件和哪個(gè)channel的事件
        SelectionKey sscKey = ssc.register(selector, 0, null);//selector、關(guān)注事件、附件
        // key 只關(guān)注 accept 事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("sscKey:{}", sscKey);
        while (true) {
            // 3. select 方法, 沒(méi)有事件發(fā)生,線程阻塞,有事件,線程才會(huì)恢復(fù)運(yùn)行
            // select 在事件未處理時(shí),它不會(huì)阻塞, 事件發(fā)生后要么處理,要么取消,不能置之不理
            selector.select();
            // 4. 處理事件, selectedKeys 內(nèi)部包含了所有發(fā)生的事件
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                // 處理key 時(shí),要從 selectedKeys 集合中刪除,否則下次處理就會(huì)有問(wèn)題
                iter.remove();
                log.debug("key: {}", key);
                // 5. 區(qū)分事件類(lèi)型
                if (key.isAcceptable()) { // 如果是 accept
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);
                    ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
                    // 將一個(gè) byteBuffer 作為附件關(guān)聯(lián)到 selectionKey 上
                    SelectionKey scKey = sc.register(selector, 0, buffer);//selector、關(guān)注事件、附件
                    scKey.interestOps(SelectionKey.OP_READ);
                    log.debug("{}", sc);
                    log.debug("scKey:{}", scKey);
                } else if (key.isReadable()) { // 如果是 read
                    try {
                        SocketChannel channel = (SocketChannel) key.channel(); // 拿到觸發(fā)事件的channel
                        // 獲取 selectionKey 上關(guān)聯(lián)的附件
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        int read = channel.read(buffer); // 如果是正常斷開(kāi),read 的方法的返回值是 -1
                        if(read == -1) {
                            key.cancel();
                        } else {
                            split(buffer);
                            // 需要擴(kuò)容
                            if (buffer.position() == buffer.limit()) {
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                                buffer.flip();
                                newBuffer.put(buffer); // 0123456789abcdef3333\n
                                key.attach(newBuffer);//關(guān)聯(lián)新的ByteBuffer 
                            }
                        }

                    } catch (IOException e) {
                        e.printStackTrace();
                        key.cancel();  // 因?yàn)榭蛻舳藬嚅_(kāi)了,因此需要將 key 取消(從 selector 的 keys 集合中真正刪除 key)
                    }
                }
            }
        }
    }
}

如果單個(gè)消息大于bytebuffer則會(huì)進(jìn)行擴(kuò)容,但只會(huì)越來(lái)越大而不會(huì)變小,這實(shí)際上會(huì)造成空間浪費(fèi)。而Netty在bytebuffer上做了優(yōu)化,能夠自適應(yīng)的調(diào)整bytebuffer的大小

Client

public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        SocketAddress address = sc.getLocalAddress();
//        sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
        sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
        System.in.read();
    }
}
消息邊界問(wèn)題

TCP是基于字節(jié)流的協(xié)議,這意味著在傳輸過(guò)程中沒(méi)有明確的消息邊界。發(fā)送方將數(shù)據(jù)流劃分為T(mén)CP段,但接收方無(wú)法直接知道發(fā)送方在發(fā)送的每個(gè)TCP段的邊界位置。接收方只能接收到連續(xù)的字節(jié)流,并需要根據(jù)應(yīng)用層協(xié)議或其他手段來(lái)確定消息邊界。

BIO、NIO和AIO,IO,nio

粘包問(wèn)題

從TCP的這些細(xì)節(jié)我們可以看出,由于TCP是面向字節(jié)流的協(xié)議,不論是在發(fā)送端,傳輸鏈路,還是在接收端,多個(gè)TCP數(shù)據(jù)包,都有可能被合并成一個(gè)。這種特點(diǎn)會(huì)給我們?cè)斐梢恍├_,比如我們發(fā)送了兩條消息“hello”和“world”,預(yù)期應(yīng)該是這樣的

BIO、NIO和AIO,IO,nio

但是有可能會(huì)變成這樣子

BIO、NIO和AIO,IO,nio

到這里我們可以看出,所謂的粘包問(wèn)題,實(shí)際上問(wèn)的是:TCP的上層應(yīng)用如何正確處理消息邊界。

具體來(lái)講,不是所有的粘包現(xiàn)象都需要處理,若傳輸?shù)臄?shù)據(jù)為不帶結(jié)構(gòu)的連續(xù)流數(shù)據(jù)(如文件傳輸),則不必把粘連的包分開(kāi)(簡(jiǎn)稱分包)。但在實(shí)際工程應(yīng)用中,傳輸?shù)臄?shù)據(jù)一般為帶結(jié)構(gòu)的數(shù)據(jù),這時(shí)就需要做分包處理。

半包問(wèn)題

消息邊界問(wèn)題還可能會(huì)帶來(lái)半包問(wèn)題,半包問(wèn)題是指在數(shù)據(jù)傳輸中,數(shù)據(jù)包沒(méi)有完整地傳輸完成就被接收端接收到,造成接收到的數(shù)據(jù)包不完整,即"半包"。這可能會(huì)導(dǎo)致數(shù)據(jù)不完整或無(wú)法正確解析。

如何正確處理TCP消息邊界?

使用標(biāo)準(zhǔn)的應(yīng)用層協(xié)議(比如:http、https)來(lái)封裝要傳輸?shù)牟欢ㄩL(zhǎng)的數(shù)據(jù)包

在每條數(shù)據(jù)的尾部添加特殊字符, 如果遇到特殊字符, 代表當(dāng)條數(shù)據(jù)接收完畢了

缺點(diǎn): 效率低, 需要一個(gè)字節(jié)一個(gè)字節(jié)接收, 接收一個(gè)字節(jié)判斷一次, 判斷是不是那個(gè)特殊字符串

在發(fā)送數(shù)據(jù)塊之前, 在數(shù)據(jù)塊最前邊添加一個(gè)固定大小的數(shù)據(jù)頭, 這時(shí)候數(shù)據(jù)由兩部分組成:數(shù)據(jù)頭+數(shù)據(jù)塊

  • 數(shù)據(jù)頭:存儲(chǔ)當(dāng)前數(shù)據(jù)包的總字節(jié)數(shù),接收端先接收數(shù)據(jù)頭,然后在根據(jù)數(shù)據(jù)頭接收對(duì)應(yīng)大小的字節(jié)
  • 數(shù)據(jù)塊:當(dāng)前數(shù)據(jù)包的內(nèi)容

固定長(zhǎng)度:發(fā)送方將每個(gè)消息固定為相同的長(zhǎng)度,不足部分使用特定的填充字符。接收方按照固定長(zhǎng)度來(lái)切分接收到的字節(jié)流,并去除填充字符。

缺點(diǎn): 浪費(fèi)帶寬和空間

client意外關(guān)閉或主動(dòng)關(guān)閉時(shí)

client無(wú)論是意外關(guān)閉還是主動(dòng)關(guān)閉都會(huì)觸發(fā)一次讀事件,如果不做相應(yīng)處理會(huì)使服務(wù)端異常

意外關(guān)閉:在異常處理時(shí)記得關(guān)閉 事件

BIO、NIO和AIO,IO,nio

主動(dòng)關(guān)閉:根據(jù)read的返回值判斷關(guān)閉的讀事件

BIO、NIO和AIO,IO,nio

4.6處理 write 事件

server

public class WriteServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));
        while (true) {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    SelectionKey sckey = sc.register(selector, 0, null);
                    sckey.interestOps(SelectionKey.OP_READ);
                    // 1. 向客戶端發(fā)送大量數(shù)據(jù)
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 5000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());

                    // 2. 返回值代表實(shí)際寫(xiě)入的字節(jié)數(shù)
                    int write = sc.write(buffer);
                    System.out.println(write);

                    // 3. 判斷是否有剩余內(nèi)容
                    if (buffer.hasRemaining()) {
                        // 4. 關(guān)注可寫(xiě)事件   1                     4
                        sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
//                        sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE);
                        // 5. 把未寫(xiě)完的數(shù)據(jù)掛到 sckey 上
                        sckey.attach(buffer);
                    }
                } else if (key.isWritable()) {
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    SocketChannel sc = (SocketChannel) key.channel();
                    int write = sc.write(buffer);
                    System.out.println(write);
                    // 6. 清理操作
                    if (!buffer.hasRemaining()) {
                        key.attach(null); // 需要清除buffer
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);//不需關(guān)注可寫(xiě)事件
                    }
                }
            }
        }
    }
}

client

public class WriteClient {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));

        // 3. 接收數(shù)據(jù)
        int count = 0;
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
            count += sc.read(buffer);
            System.out.println(count);
            buffer.clear();
        }
    }
}
一次無(wú)法保證把 buffer 中所有數(shù)據(jù)都寫(xiě)入 channel
  • 非阻塞模式下,無(wú)法保證把 buffer 中所有數(shù)據(jù)都寫(xiě)入 channel,因此需要追蹤 write 方法的返回值(代表實(shí)際寫(xiě)入字節(jié)數(shù))
  • 用 selector 監(jiān)聽(tīng)所有 channel 的可寫(xiě)事件,每個(gè) channel 都需要一個(gè) key 來(lái)跟蹤 buffer,但這樣又會(huì)導(dǎo)致占用內(nèi)存過(guò)多,就有兩階段策略
    1. 當(dāng)消息處理器第一次寫(xiě)入消息時(shí),才將 channel 注冊(cè)到 selector 上
    2. selector 檢查 channel 上的可寫(xiě)事件,如果所有的數(shù)據(jù)寫(xiě)完了,就取消 channel 的注冊(cè)
    3. 如果不取消,會(huì)每次可寫(xiě)均會(huì)觸發(fā) write 事件

4.7注意事項(xiàng)

?必須處理事件
SelectionKey sscKey = ssc.register(selector, 0, null);//selector、關(guān)注事件(0表示不關(guān)注任何事件)、附件
// key 只關(guān)注 accept 事件,如果之前關(guān)注了其他事件會(huì)覆蓋,可以用 + 或 | 連接
sscKey.interestOps(SelectionKey.OP_ACCEPT);

事件發(fā)生后,要么處理,要么取消(cancel,cancel 會(huì)取消注冊(cè)在 selector 上的 channel,并從 keys 集合中刪除 key 后續(xù)不會(huì)再監(jiān)聽(tīng)事件 ),不能什么都不做,否則下次該事件仍會(huì)觸發(fā)(selector.select()不會(huì)阻塞),這是因?yàn)?nio 底層使用的是水平觸發(fā)。

水平觸發(fā)和邊緣觸發(fā)
  • 水平觸發(fā):套接字上只要有數(shù)據(jù),就一直觸發(fā)
  • 邊緣觸發(fā):套接字上有數(shù)據(jù)到來(lái),才觸發(fā)一次事件,無(wú)論數(shù)據(jù)是否讀/寫(xiě)完
事件處理完后需要在 selectedKeys 集合中移除對(duì)應(yīng)的SelectionKey

?因?yàn)?select 在事件發(fā)生后,就會(huì)將相關(guān)的 key 放入 selectedKeys 集合,但不會(huì)在處理完后從 selectedKeys 集合中移除,需要我們自己編碼刪除。例如:

  • 第一次觸發(fā)了 ssckey 上的 accept 事件,沒(méi)有移除 ssckey
  • 第二次觸發(fā)了 sckey 上的 read 事件,但這時(shí) selectedKeys 中還有上次的 ssckey ,在處理時(shí)因?yàn)闆](méi)有真正的 serverSocket 連上了,就會(huì)導(dǎo)致空指針異常

BIO、NIO和AIO,IO,nio

ByteBuffer 大小分配
  • 每個(gè) channel 都需要記錄可能被切分的消息,因?yàn)?ByteBuffer 不能被多個(gè) channel 共同使用,因此需要為每個(gè) channel 維護(hù)一個(gè)獨(dú)立的 ByteBuffer
  • ByteBuffer 不能太大,比如一個(gè) ByteBuffer 1Mb 的話,要支持百萬(wàn)連接就要 1Tb 內(nèi)存,因此需要設(shè)計(jì)大小可變的 ByteBuffer
    1. 一種思路是首先分配一個(gè)較小的 buffer,例如 4k,如果發(fā)現(xiàn)數(shù)據(jù)不夠,再分配 8k 的 buffer,將 4k buffer 內(nèi)容拷貝至 8k buffer,優(yōu)點(diǎn)是消息連續(xù)容易處理,缺點(diǎn)是數(shù)據(jù)拷貝耗費(fèi)性能。
    2. 另一種思路是用多個(gè)數(shù)組組成 buffer,一個(gè)數(shù)組不夠,把多出來(lái)的內(nèi)容寫(xiě)入新的數(shù)組,與前面的區(qū)別是消息存儲(chǔ)不連續(xù)解析復(fù)雜,優(yōu)點(diǎn)是避免了拷貝引起的性能損耗
select 何時(shí)不阻塞
  • 事件發(fā)生時(shí)
    1. 客戶端發(fā)起連接請(qǐng)求,會(huì)觸發(fā) accept 事件
    2. 客戶端發(fā)送數(shù)據(jù)過(guò)來(lái),客戶端正常、異常關(guān)閉時(shí),都會(huì)觸發(fā) read 事件,另外如果發(fā)送的數(shù)據(jù)大于 buffer 緩沖區(qū),會(huì)觸發(fā)多次讀取事件
    3. channel 可寫(xiě),會(huì)觸發(fā) write 事件
    4. 在 linux 下 nio bug 發(fā)生時(shí)
  • 調(diào)用 selector.wakeup()
  • 調(diào)用 selector.close()
  • selector 所在線程 interrupt

5.多線程優(yōu)化

之前講的都是用單線程配合selector來(lái)管理多個(gè)channel上的事件,雖然可行但現(xiàn)在都是多核CPU,其他CPU沒(méi)有被充分利用,也是一種資源浪費(fèi)。并且單線程處理的時(shí)候,如果在處理某件事上耗費(fèi)的時(shí)間過(guò)長(zhǎng),那么對(duì)其他事件的處理也有影響Redis是單線程,底層采用了IO多路復(fù)用和NIO模型類(lèi)似,所以也有這個(gè)缺點(diǎn)。

我們可以分兩組選擇器

BIO、NIO和AIO,IO,nio

  • 單線程配一個(gè)選擇器,專(zhuān)門(mén)處理 accept 事件
  • 創(chuàng)建 cpu 核心數(shù)的線程,每個(gè)線程配一個(gè)選擇器,輪流處理 read 事件

如果selector已經(jīng)處于阻塞狀態(tài)( selector.select() ),則無(wú)法將channel注冊(cè)到selector上。我們?nèi)绻?span style="color:#be191c;">在線程之間傳遞數(shù)據(jù)的話可以用 隊(duì)列。

案例

server

@Slf4j
public class MultiThreadServer {
    public static void main(String[] args) throws IOException {
        Thread.currentThread().setName("boss");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector boss = Selector.open();
        SelectionKey bossKey = ssc.register(boss, 0, null);
        bossKey.interestOps(SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));
        // 1. 創(chuàng)建固定數(shù)量的 worker 并初始化
        Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new Worker("worker-" + i);
        }
        //計(jì)數(shù)器,第一次 get 是 0
        AtomicInteger index = new AtomicInteger();
        while(true) {
            boss.select();
            Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    log.debug("connected...{}", sc.getRemoteAddress());
                    // 2. 關(guān)聯(lián) selector
                    log.debug("before register...{}", sc.getRemoteAddress());
                    // round robin 輪詢
                    workers[index.getAndIncrement() % workers.length].register(sc); // boss 調(diào)用 初始化 selector , 啟動(dòng) worker-0
                    log.debug("after register...{}", sc.getRemoteAddress());
                }
            }
        }
    }
    static class Worker implements Runnable{
        private Thread thread;
        private Selector selector;
        private String name;
        private volatile boolean start = false; // 還未初始化
        private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
        public Worker(String name) {
            this.name = name;
        }

        // 初始化線程,和 selector
        public void register(SocketChannel sc) throws IOException { //還是boss線程在調(diào)用
            if(!start) {
                selector = Selector.open();
                thread = new Thread(this, name);
                thread.start();
                start = true;
            }
            //方法一:隊(duì)列傳遞數(shù)據(jù)。向隊(duì)列里添加任務(wù),但這個(gè)任務(wù)并沒(méi)有立即執(zhí)行
            queue.add(()->{
                try {
                    sc.register(selector, SelectionKey.OP_READ, null);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });
            selector.wakeup(); // 喚醒 select 方法
            //方法二:直接使用 wakeup
//            selector.wakeup(); // 喚醒 select 方法
//            sc.register(selector, SelectionKey.OP_READ, null);

        }

        @Override
        public void run() {
            while(true) {
                try {
                    selector.select(); // worker-0  阻塞
                    Runnable task= queue.poll();
                    if(task!=null){
                        task.run();//執(zhí)行   sc.register(selector, SelectionKey.OP_READ, null);
                    }
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isReadable()) {
                            //這里只是粗略的進(jìn)行了讀處理,并未解決消息邊界、客戶端停止連接等問(wèn)題
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            SocketChannel channel = (SocketChannel) key.channel();
                            log.debug("read...{}", channel.getRemoteAddress());
                            channel.read(buffer);
                            buffer.flip();
                            debugAll(buffer);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

client

public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        SocketAddress address = sc.getLocalAddress();
        sc.write(Charset.defaultCharset().encode("0123456789abcdef"));
        System.in.read();
    }
}

如何拿到 cpu 個(gè)數(shù)

  • Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因?yàn)槿萜鞑皇俏锢砀綦x的,會(huì)拿到物理 cpu 個(gè)數(shù),而不是容器申請(qǐng)時(shí)的個(gè)數(shù)
  • 這個(gè)問(wèn)題直到 jdk 10 才修復(fù),使用 jvm 參數(shù) UseContainerSupport 配置, 默認(rèn)開(kāi)啟

6.NIO vs BIO

stream vs channel

  • stream 不會(huì)自動(dòng)緩沖數(shù)據(jù),channel 會(huì)利用系統(tǒng)提供的發(fā)送緩沖區(qū)、接收緩沖區(qū)(更為底層)
  • stream 僅支持阻塞 API,channel 同時(shí)支持阻塞、非阻塞 API,網(wǎng)絡(luò) channel 可配合 selector 實(shí)現(xiàn)多路復(fù)用
  • 二者均為全雙工,即讀寫(xiě)可以同時(shí)進(jìn)行

四.AIO

AIO 用來(lái)解決數(shù)據(jù)復(fù)制階段的阻塞問(wèn)題

  • 同步意味著,在進(jìn)行讀寫(xiě)操作時(shí),線程需要等待結(jié)果,還是相當(dāng)于閑置
  • 異步意味著,在進(jìn)行讀寫(xiě)操作時(shí),線程不必等待結(jié)果,而是將來(lái)由操作系統(tǒng)來(lái)通過(guò)回調(diào)方式由另外的線程來(lái)獲得結(jié)果

異步模型需要底層操作系統(tǒng)(Kernel)提供支持文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-661430.html

  • Windows 系統(tǒng)通過(guò) IOCP 實(shí)現(xiàn)了真正的異步 IO
  • Linux 系統(tǒng)異步 IO 在 2.6 版本引入,但其底層實(shí)現(xiàn)還是用多路復(fù)用模擬了異步 IO性能沒(méi)有優(yōu)勢(shì)。而我們的項(xiàng)目最終又需要部署到liunx服務(wù)器上,所以異步IO看起來(lái)很好,但實(shí)際用的很少。

到了這里,關(guān)于BIO、NIO和AIO的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • BIO、NIO、AIO 有什么區(qū)別?

    Java 中的I/O模型主要分為三類(lèi):BIO(Blocking I/O)、NIO(New I/O)和AIO(Asynchronous I/O)。它們?cè)谔幚鞩/O操作時(shí)有著不同的工作方式和特點(diǎn)。 BIO是傳統(tǒng)的I/O模型,也稱為同步I/O。在BIO中,每個(gè)I/O操作都會(huì)阻塞線程,直到數(shù)據(jù)準(zhǔn)備好或者操作完成。這意味著一個(gè)線程只能處理一個(gè)連

    2024年01月16日
    瀏覽(20)
  • java中的BIO NIO AIO

    java中的BIO NIO AIO

    ??????? 多路復(fù)用IO模型是目前使用的比較多的模型。java中的NIO常用的理解是在 網(wǎng)絡(luò)IO中,那么在網(wǎng)絡(luò)IO中為什么NIO比BIO效率更高?我們的web項(xiàng)目中是用的哪種呢?可以往下看。 ????????JavaNIO實(shí)際上就是多路復(fù)用IO。在多路復(fù)用IO模型中,會(huì)有一個(gè)線程不斷地區(qū)輪詢多個(gè)

    2024年02月15日
    瀏覽(27)
  • Java中的BIO、NIO與AIO

    Java中的BIO、NIO與AIO

    ?? I/O 模型簡(jiǎn)單的理解:就是用什么樣的通道進(jìn)行數(shù)據(jù)的發(fā)送和接收,很大程度上決定了程序通信的性能。 Java 共支持 3 種網(wǎng)絡(luò)編程模型 I/O 模式: BIO 、 NIO 、 AIO 。 ?? Java BIO(Blocking I/O) :是傳統(tǒng)的java io 編程,其相關(guān)的類(lèi)和接口在 java.io。同步并阻塞(傳統(tǒng)阻塞型),服

    2024年04月26日
    瀏覽(27)
  • JAVA的BIO、NIO、AIO模式精解(一)

    JAVA的BIO、NIO、AIO模式精解(一)

    在不同系統(tǒng)或進(jìn)程間數(shù)據(jù)交互,或高并發(fā)場(chǎng)景下都選喲網(wǎng)絡(luò)通信。早期是基于性能低下的同步阻塞IO(BIO)實(shí)現(xiàn)。后支持非阻塞IO(NIO)。 前置須知:javsse,java多線程,javaIO,java網(wǎng)絡(luò)模型 目的:局域網(wǎng)內(nèi)通信,多系統(tǒng)間底層消息傳遞機(jī)制,高并發(fā)下大數(shù)據(jù)通信,游戲應(yīng)用。 IO模型

    2023年04月27日
    瀏覽(23)
  • 【Java基礎(chǔ)】BIO/NIO/AIO的詳細(xì)介紹與比較區(qū)分

    【Java基礎(chǔ)】BIO/NIO/AIO的詳細(xì)介紹與比較區(qū)分

    BIO 全稱 Blocking I/O,它是 JDK 1.4 之前的傳統(tǒng)IO模型,是一種同步阻塞的IO,線程發(fā)起 IO 后,一直阻塞,直到緩沖區(qū)數(shù)據(jù)就緒后,在進(jìn)入下一步操作 BIO存在的問(wèn)題: 無(wú)法應(yīng)對(duì)高并發(fā)的場(chǎng)景 連接建立后,當(dāng)前線程沒(méi)有數(shù)據(jù)可讀就會(huì)阻塞,造成資源浪費(fèi) BIO適用場(chǎng)景: 客戶端連接數(shù)

    2024年01月20日
    瀏覽(26)
  • JAVA中三種I/O框架——BIO、NIO、AIO

    JAVA中三種I/O框架——BIO、NIO、AIO

    BIO,同步阻塞IO模型,應(yīng)用程序發(fā)起系統(tǒng)調(diào)用后會(huì)一直等待數(shù)據(jù)的請(qǐng)求,直至內(nèi)核從磁盤(pán)獲取到數(shù)據(jù)并拷貝到用戶空間; 在一般的場(chǎng)景中,多線程模型下的BIO是成本較低、收益較高的方式。但是,如果在高并發(fā)的場(chǎng)景下,過(guò)多的創(chuàng)建線程,會(huì)嚴(yán)重占據(jù)系統(tǒng)資源,降低系統(tǒng)對(duì)外

    2024年02月08日
    瀏覽(18)
  • Java中的三種I/O模型:BIO、NIO和AIO

    I/O(輸入/輸出)操作是任何應(yīng)用程序中必不可少的一部分,它涉及到與文件、網(wǎng)絡(luò)或其他設(shè)備之間的數(shù)據(jù)傳輸。Java提供了幾種不同的I/O模型,其中最常見(jiàn)的是AIO(異步非阻塞I/O)、BIO(阻塞I/O)和NIO(非阻塞I/O)。這些模型在處理I/O操作時(shí)具有不同的工作方式、特性和適用

    2024年02月08日
    瀏覽(22)
  • BIO、NIO、IO多路復(fù)用模型詳細(xì)介紹&Java NIO 網(wǎng)絡(luò)編程

    BIO、NIO、IO多路復(fù)用模型詳細(xì)介紹&Java NIO 網(wǎng)絡(luò)編程

    上文介紹了網(wǎng)絡(luò)編程的基礎(chǔ)知識(shí),并基于 Java 編寫(xiě)了 BIO 的網(wǎng)絡(luò)編程。我們知道 BIO 模型是存在巨大問(wèn)題的,比如 C10K 問(wèn)題,其本質(zhì)就是因其阻塞原因,導(dǎo)致如果想要承受更多的請(qǐng)求就必須有足夠多的線程,但是足夠多的線程會(huì)帶來(lái)內(nèi)存占用問(wèn)題、CPU上下文切換帶來(lái)的性能問(wèn)題

    2024年02月14日
    瀏覽(30)
  • 阻塞非阻塞IO(BIO和NIO),IO多路復(fù)用

    阻塞非阻塞IO(BIO和NIO),IO多路復(fù)用

    1.概念 NIO(New Input/Output)和BIO(Blocking Input/Output)是Java中用于處理輸入輸出的兩種不同的模型。 ? BIO 會(huì) 阻塞 ,等有了消息,立刻返回,一個(gè)線程處理一個(gè)recv(需要很多線程)。 NIO 有沒(méi)有消息,都返回(但程序要自己判斷,返回空就循環(huán)重復(fù));一個(gè)線程可以處理多個(gè)

    2024年02月09日
    瀏覽(21)
  • NIO與BIO

    當(dāng)談到 Java 網(wǎng)絡(luò)編程時(shí),經(jīng)常會(huì)聽(tīng)到兩個(gè)重要的概念:BIO(Blocking I/O,阻塞 I/O)和 NIO(Non-blocking I/O,非阻塞 I/O)。它們都是 Java 中用于處理 I/O 操作的不同編程模型。 BIO 是 Java 最早的 I/O 模型,也是最簡(jiǎn)單的一種。在 BIO 模型中,每個(gè) I/O 操作都會(huì)阻塞當(dāng)前線程,直到數(shù)據(jù)

    2024年04月10日
    瀏覽(19)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包