国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

NIO基礎(chǔ) - 網(wǎng)絡(luò)編程

這篇具有很好參考價值的文章主要介紹了NIO基礎(chǔ) - 網(wǎng)絡(luò)編程。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

non-blocking io 非阻塞 IO

1. 三大組件

1.1 Channel & Buffer

channel 有一點類似于 stream,它就是讀寫數(shù)據(jù)的雙向通道,可以從 channel 將數(shù)據(jù)讀入 buffer,也可以將 buffer 的數(shù)據(jù)寫入 channel,而之前的 stream 要么是輸入,要么是輸出,channel 比 stream 更為底層

常見的 Channel 有

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

buffer 則用來緩沖讀寫數(shù)據(jù),常見的 buffer 有

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

1.2 Selector

selector 單從字面意思不好理解,需要結(jié)合服務(wù)器的設(shè)計演化來理解它的用途

多線程版設(shè)計
?? 多線程版缺點
  • 內(nèi)存占用高
  • 線程上下文切換成本高
  • 只適合連接數(shù)少的場景
線程池版設(shè)計
?? 線程池版缺點
  • 阻塞模式下,線程僅能處理一個 socket 連接
  • 僅適合短連接場景
selector 版設(shè)計

selector 的作用就是配合一個線程來管理多個 channel,獲取這些 channel 上發(fā)生的事件,這些 channel 工作在非阻塞模式下,不會讓線程吊死在一個 channel 上。適合連接數(shù)特別多,但流量低的場景(low traffic)

調(diào)用 selector 的 select() 會阻塞直到 channel 發(fā)生了讀寫就緒事件,這些事件發(fā)生,select 方法就會返回這些事件交給 thread 來處理

2. ByteBuffer

有一普通文本文件 data.txt,內(nèi)容為

1234567890abcd

使用 FileChannel 來讀取文件內(nèi)容

@Slf4j
public class ChannelDemo1 {
    public static void main(String[] args) {
        try (RandomAccessFile file = new RandomAccessFile("helloword/data.txt", "rw")) {
            FileChannel channel = file.getChannel();
            ByteBuffer buffer = ByteBuffer.allocate(10);
            do {
                // 向 buffer 寫入
                int len = channel.read(buffer);
                log.debug("讀到字節(jié)數(shù):{}", len);
                if (len == -1) {
                    break;
                }
                // 切換 buffer 讀模式
                buffer.flip();
                while(buffer.hasRemaining()) {
                    log.debug("{}", (char)buffer.get());
                }
                // 切換 buffer 寫模式
                buffer.clear();
            } while (true);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

輸出

10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節(jié)數(shù):10
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 1
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 2
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 3
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 4
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 5
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 6
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 7
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 8
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 9
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 0
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節(jié)數(shù):4
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - a
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - b
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - c
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - d
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節(jié)數(shù):-1

2.1 ByteBuffer 正確使用姿勢

  1. 向 buffer 寫入數(shù)據(jù),例如調(diào)用 channel.read(buffer)
  2. 調(diào)用 flip() 切換至讀模式
  3. 從 buffer 讀取數(shù)據(jù),例如調(diào)用 buffer.get()
  4. 調(diào)用 clear() 或 compact() 切換至寫模式
  5. 重復(fù) 1~4 步驟

2.2 ByteBuffer 結(jié)構(gòu)

ByteBuffer 有以下重要屬性

  • capacity
  • position
  • limit

一開始

NIO基礎(chǔ) - 網(wǎng)絡(luò)編程

寫模式下,position 是寫入位置,limit 等于容量,下圖表示寫入了 4 個字節(jié)后的狀態(tài)

NIO基礎(chǔ) - 網(wǎng)絡(luò)編程

flip 動作發(fā)生后,position 切換為讀取位置,limit 切換為讀取限制

NIO基礎(chǔ) - 網(wǎng)絡(luò)編程

讀取 4 個字節(jié)后,狀態(tài)

NIO基礎(chǔ) - 網(wǎng)絡(luò)編程

clear 動作發(fā)生后,狀態(tài)

NIO基礎(chǔ) - 網(wǎng)絡(luò)編程

compact 方法,是把未讀完的部分向前壓縮,然后切換至寫模式

NIO基礎(chǔ) - 網(wǎng)絡(luò)編程

?? 調(diào)試工具類
public class ByteBufferUtil {
    private static final char[] BYTE2CHAR = new char[256];
    private static final char[] HEXDUMP_TABLE = new char[256 * 4];
    private static final String[] HEXPADDING = new String[16];
    private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
    private static final String[] BYTE2HEX = new String[256];
    private static final String[] BYTEPADDING = new String[16];

    static {
        final char[] DIGITS = "0123456789abcdef".toCharArray();
        for (int i = 0; i < 256; i++) {
            HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
            HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
        }

        int i;

        // Generate the lookup table for hex dump paddings
        for (i = 0; i < HEXPADDING.length; i++) {
            int padding = HEXPADDING.length - i;
            StringBuilder buf = new StringBuilder(padding * 3);
            for (int j = 0; j < padding; j++) {
                buf.append("   ");
            }
            HEXPADDING[i] = buf.toString();
        }

        // Generate the lookup table for the start-offset header in each row (up to 64KiB).
        for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
            StringBuilder buf = new StringBuilder(12);
            buf.append(NEWLINE);
            buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
            buf.setCharAt(buf.length() - 9, '|');
            buf.append('|');
            HEXDUMP_ROWPREFIXES[i] = buf.toString();
        }

        // Generate the lookup table for byte-to-hex-dump conversion
        for (i = 0; i < BYTE2HEX.length; i++) {
            BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
        }

        // Generate the lookup table for byte dump paddings
        for (i = 0; i < BYTEPADDING.length; i++) {
            int padding = BYTEPADDING.length - i;
            StringBuilder buf = new StringBuilder(padding);
            for (int j = 0; j < padding; j++) {
                buf.append(' ');
            }
            BYTEPADDING[i] = buf.toString();
        }

        // Generate the lookup table for byte-to-char conversion
        for (i = 0; i < BYTE2CHAR.length; i++) {
            if (i <= 0x1f || i >= 0x7f) {
                BYTE2CHAR[i] = '.';
            } else {
                BYTE2CHAR[i] = (char) i;
            }
        }
    }

    /**
     * 打印所有內(nèi)容
     * @param buffer
     */
    public static void debugAll(ByteBuffer buffer) {
        int oldlimit = buffer.limit();
        buffer.limit(buffer.capacity());
        StringBuilder origin = new StringBuilder(256);
        appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
        System.out.println("+--------+-------------------- all ------------------------+----------------+");
        System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
        System.out.println(origin);
        buffer.limit(oldlimit);
    }

    /**
     * 打印可讀取內(nèi)容
     * @param buffer
     */
    public static void debugRead(ByteBuffer buffer) {
        StringBuilder builder = new StringBuilder(256);
        appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
        System.out.println("+--------+-------------------- read -----------------------+----------------+");
        System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
        System.out.println(builder);
    }

    private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
        if (isOutOfBounds(offset, length, buf.capacity())) {
            throw new IndexOutOfBoundsException(
                    "expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
                            + ") <= " + "buf.capacity(" + buf.capacity() + ')');
        }
        if (length == 0) {
            return;
        }
        dump.append(
                "         +-------------------------------------------------+" +
                        NEWLINE + "         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |" +
                        NEWLINE + "+--------+-------------------------------------------------+----------------+");

        final int startIndex = offset;
        final int fullRows = length >>> 4;
        final int remainder = length & 0xF;

        // Dump the rows which have 16 bytes.
        for (int row = 0; row < fullRows; row++) {
            int rowStartIndex = (row << 4) + startIndex;

            // Per-row prefix.
            appendHexDumpRowPrefix(dump, row, rowStartIndex);

            // Hex dump
            int rowEndIndex = rowStartIndex + 16;
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
            }
            dump.append(" |");

            // ASCII dump
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
            }
            dump.append('|');
        }

        // Dump the last row which has less than 16 bytes.
        if (remainder != 0) {
            int rowStartIndex = (fullRows << 4) + startIndex;
            appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

            // Hex dump
            int rowEndIndex = rowStartIndex + remainder;
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
            }
            dump.append(HEXPADDING[remainder]);
            dump.append(" |");

            // Ascii dump
            for (int j = rowStartIndex; j < rowEndIndex; j++) {
                dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
            }
            dump.append(BYTEPADDING[remainder]);
            dump.append('|');
        }

        dump.append(NEWLINE +
                "+--------+-------------------------------------------------+----------------+");
    }

    private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
        if (row < HEXDUMP_ROWPREFIXES.length) {
            dump.append(HEXDUMP_ROWPREFIXES[row]);
        } else {
            dump.append(NEWLINE);
            dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
            dump.setCharAt(dump.length() - 9, '|');
            dump.append('|');
        }
    }

    public static short getUnsignedByte(ByteBuffer buffer, int index) {
        return (short) (buffer.get(index) & 0xFF);
    }
}

2.3 ByteBuffer 常見方法

分配空間

可以使用 allocate 方法為 ByteBuffer 分配空間,其它 buffer 類也有該方法

Bytebuffer buf = ByteBuffer.allocate(16);
向 buffer 寫入數(shù)據(jù)

有兩種辦法

  • 調(diào)用 channel 的 read 方法
  • 調(diào)用 buffer 自己的 put 方法
int readBytes = channel.read(buf);

buf.put((byte)127);
從 buffer 讀取數(shù)據(jù)

同樣有兩種辦法

  • 調(diào)用 channel 的 write 方法
  • 調(diào)用 buffer 自己的 get 方法
int writeBytes = channel.write(buf);

byte b = buf.get();

get 方法會讓 position 讀指針向后走,如果想重復(fù)讀取數(shù)據(jù)

  • 可以調(diào)用 rewind 方法將 position 重新置為 0
  • 或者調(diào)用 get(int i) 方法獲取索引 i 的內(nèi)容,它不會移動讀指針
mark 和 reset

mark 是在讀取時,做一個標(biāo)記,即使 position 改變,只要調(diào)用 reset 就能回到 mark 的位置

注意

rewind 和 flip 都會清除 mark 位置

字符串與 ByteBuffer 互轉(zhuǎn)
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好");
ByteBuffer buffer2 = Charset.forName("utf-8").encode("你好");

debug(buffer1);
debug(buffer2);

CharBuffer buffer3 = StandardCharsets.UTF_8.decode(buffer1);
System.out.println(buffer3.getClass());
System.out.println(buffer3.toString());

輸出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| e4 bd a0 e5 a5 bd                               |......          |
+--------+-------------------------------------------------+----------------+
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| e4 bd a0 e5 a5 bd                               |......          |
+--------+-------------------------------------------------+----------------+
class java.nio.HeapCharBuffer
你好

?? Buffer 的線程安全

Buffer 是非線程安全的

2.4 Scattering Reads

分散讀取,有一個文本文件 3parts.txt

onetwothree

使用如下方式讀取,可以將數(shù)據(jù)填充至多個 buffer

try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
    FileChannel channel = file.getChannel();
    ByteBuffer a = ByteBuffer.allocate(3);
    ByteBuffer b = ByteBuffer.allocate(3);
    ByteBuffer c = ByteBuffer.allocate(5);
    channel.read(new ByteBuffer[]{a, b, c});
    a.flip();
    b.flip();
    c.flip();
    debug(a);
    debug(b);
    debug(c);
} catch (IOException e) {
    e.printStackTrace();
}

結(jié)果

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 6f 6e 65                                        |one             |
+--------+-------------------------------------------------+----------------+
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 77 6f                                        |two             |
+--------+-------------------------------------------------+----------------+
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 68 72 65 65                                  |three           |
+--------+-------------------------------------------------+----------------+

2.5 Gathering Writes

使用如下方式寫入,可以將多個 buffer 的數(shù)據(jù)填充至 channel

try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
    FileChannel channel = file.getChannel();
    ByteBuffer d = ByteBuffer.allocate(4);
    ByteBuffer e = ByteBuffer.allocate(4);
    channel.position(11);

    d.put(new byte[]{'f', 'o', 'u', 'r'});
    e.put(new byte[]{'f', 'i', 'v', 'e'});
    d.flip();
    e.flip();
    debug(d);
    debug(e);
    channel.write(new ByteBuffer[]{d, e});
} catch (IOException e) {
    e.printStackTrace();
}

輸出

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 6f 75 72                                     |four            |
+--------+-------------------------------------------------+----------------+
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 69 76 65                                     |five            |
+--------+-------------------------------------------------+----------------+

文件內(nèi)容

onetwothreefourfive

2.6 練習(xí)

網(wǎng)絡(luò)上有多條數(shù)據(jù)發(fā)送給服務(wù)端,數(shù)據(jù)之間使用 \n 進行分隔
但由于某種原因這些數(shù)據(jù)在接收時,被進行了重新組合,例如原始數(shù)據(jù)有3條為

  • Hello,world\n
  • I’m zhangsan\n
  • How are you?\n

變成了下面的兩個 byteBuffer (黏包,半包)

  • Hello,world\nI’m zhangsan\nHo
  • w are you?\n

現(xiàn)在要求你編寫程序,將錯亂的數(shù)據(jù)恢復(fù)成原始的按 \n 分隔的數(shù)據(jù)

public static void main(String[] args) {
    ByteBuffer source = ByteBuffer.allocate(32);
    //                     11            24
    source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
    split(source);

    source.put("w are you?\nhaha!\n".getBytes());
    split(source);
}

private static void split(ByteBuffer source) {
    source.flip();
    int oldLimit = source.limit();
    for (int i = 0; i < oldLimit; i++) {
        if (source.get(i) == '\n') {
            System.out.println(i);
            ByteBuffer target = ByteBuffer.allocate(i + 1 - source.position());
            // 0 ~ limit
            source.limit(i + 1);
            target.put(source); // 從source 讀,向 target 寫
            debugAll(target);
            source.limit(oldLimit);
        }
    }
    source.compact();
}

3. 文件編程

3.1 FileChannel

?? FileChannel 工作模式

FileChannel 只能工作在阻塞模式下

獲取

不能直接打開 FileChannel,必須通過 FileInputStream、FileOutputStream 或者 RandomAccessFile 來獲取 FileChannel,它們都有 getChannel 方法

  • 通過 FileInputStream 獲取的 channel 只能讀
  • 通過 FileOutputStream 獲取的 channel 只能寫
  • 通過 RandomAccessFile 是否能讀寫根據(jù)構(gòu)造 RandomAccessFile 時的讀寫模式?jīng)Q定
讀取

會從 channel 讀取數(shù)據(jù)填充 ByteBuffer,返回值表示讀到了多少字節(jié),-1 表示到達(dá)了文件的末尾

int readBytes = channel.read(buffer);
寫入

寫入的正確姿勢如下, SocketChannel

ByteBuffer buffer = ...;
buffer.put(...); // 存入數(shù)據(jù)
buffer.flip();   // 切換讀模式

while(buffer.hasRemaining()) {
    channel.write(buffer);
}

在 while 中調(diào)用 channel.write 是因為 write 方法并不能保證一次將 buffer 中的內(nèi)容全部寫入 channel

關(guān)閉

channel 必須關(guān)閉,不過調(diào)用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法會間接地調(diào)用 channel 的 close 方法

位置

獲取當(dāng)前位置

long pos = channel.position();

設(shè)置當(dāng)前位置

long newPos = ...;
channel.position(newPos);

設(shè)置當(dāng)前位置時,如果設(shè)置為文件的末尾

  • 這時讀取會返回 -1
  • 這時寫入,會追加內(nèi)容,但要注意如果 position 超過了文件末尾,再寫入時在新內(nèi)容和原末尾之間會有空洞(00)
大小

使用 size 方法獲取文件的大小

強制寫入

操作系統(tǒng)出于性能的考慮,會將數(shù)據(jù)緩存,不是立刻寫入磁盤??梢哉{(diào)用 force(true) 方法將文件內(nèi)容和元數(shù)據(jù)(文件的權(quán)限等信息)立刻寫入磁盤

3.2 兩個 Channel 傳輸數(shù)據(jù)

String FROM = "helloword/data.txt";
String TO = "helloword/to.txt";
long start = System.nanoTime();
try (FileChannel from = new FileInputStream(FROM).getChannel();
     FileChannel to = new FileOutputStream(TO).getChannel();
    ) {
    from.transferTo(0, from.size(), to);
} catch (IOException e) {
    e.printStackTrace();
}
long end = System.nanoTime();
System.out.println("transferTo 用時:" + (end - start) / 1000_000.0);

輸出

transferTo 用時:8.2011

超過 2g 大小的文件傳輸

public class TestFileChannelTransferTo {
    public static void main(String[] args) {
        try (
                FileChannel from = new FileInputStream("data.txt").getChannel();
                FileChannel to = new FileOutputStream("to.txt").getChannel();
        ) {
            // 效率高,底層會利用操作系統(tǒng)的零拷貝進行優(yōu)化
            long size = from.size();
            // left 變量代表還剩余多少字節(jié)
            for (long left = size; left > 0; ) {
                System.out.println("position:" + (size - left) + " left:" + left);
                left -= from.transferTo((size - left), left, to);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

實際傳輸一個超大文件

position:0 left:7769948160
position:2147483647 left:5622464513
position:4294967294 left:3474980866
position:6442450941 left:1327497219

3.3 Path

jdk7 引入了 Path 和 Paths 類

  • Path 用來表示文件路徑
  • Paths 是工具類,用來獲取 Path 實例
Path source = Paths.get("1.txt"); // 相對路徑 使用 user.dir 環(huán)境變量來定位 1.txt

Path source = Paths.get("d:\\1.txt"); // 絕對路徑 代表了  d:\1.txt

Path source = Paths.get("d:/1.txt"); // 絕對路徑 同樣代表了  d:\1.txt

Path projects = Paths.get("d:\\data", "projects"); // 代表了  d:\data\projects

  • . 代表了當(dāng)前路徑
  • .. 代表了上一級路徑

例如目錄結(jié)構(gòu)如下

d:
	|- data
		|- projects
			|- a
			|- b

代碼

Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
System.out.println(path);
System.out.println(path.normalize()); // 正常化路徑

會輸出

d:\data\projects\a\..\b
d:\data\projects\b

3.4 Files

檢查文件是否存在

Path path = Paths.get("helloword/data.txt");
System.out.println(Files.exists(path));

創(chuàng)建一級目錄

Path path = Paths.get("helloword/d1");
Files.createDirectory(path);

  • 如果目錄已存在,會拋異常 FileAlreadyExistsException
  • 不能一次創(chuàng)建多級目錄,否則會拋異常 NoSuchFileException

創(chuàng)建多級目錄用

Path path = Paths.get("helloword/d1/d2");
Files.createDirectories(path);

拷貝文件

Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/target.txt");

Files.copy(source, target);
  • 如果文件已存在,會拋異常 FileAlreadyExistsException

如果希望用 source 覆蓋掉 target,需要用 StandardCopyOption 來控制

Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);

移動文件

Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/data.txt");

Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
  • StandardCopyOption.ATOMIC_MOVE 保證文件移動的原子性

刪除文件

Path target = Paths.get("helloword/target.txt");

Files.delete(target);

  • 如果文件不存在,會拋異常 NoSuchFileException

刪除目錄

Path target = Paths.get("helloword/d1");

Files.delete(target);

  • 如果目錄還有內(nèi)容,會拋異常 DirectoryNotEmptyException

遍歷目錄文件

public static void main(String[] args) throws IOException {
    Path path = Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91");
    AtomicInteger dirCount = new AtomicInteger();
    AtomicInteger fileCount = new AtomicInteger();
    Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
        @Override
        public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) 
            throws IOException {
            System.out.println(dir);
            dirCount.incrementAndGet();
            return super.preVisitDirectory(dir, attrs);
        }

        @Override
        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) 
            throws IOException {
            System.out.println(file);
            fileCount.incrementAndGet();
            return super.visitFile(file, attrs);
        }
    });
    System.out.println(dirCount); // 133
    System.out.println(fileCount); // 1479
}

統(tǒng)計 jar 的數(shù)目

Path path = Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91");
AtomicInteger fileCount = new AtomicInteger();
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) 
        throws IOException {
        if (file.toFile().getName().endsWith(".jar")) {
            fileCount.incrementAndGet();
        }
        return super.visitFile(file, attrs);
    }
});
System.out.println(fileCount); // 724

刪除多級目錄

Path path = Paths.get("d:\\a");
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) 
        throws IOException {
        Files.delete(file);
        return super.visitFile(file, attrs);
    }

    @Override
    public FileVisitResult postVisitDirectory(Path dir, IOException exc) 
        throws IOException {
        Files.delete(dir);
        return super.postVisitDirectory(dir, exc);
    }
});
?? 刪除很危險

刪除是危險操作,確保要遞歸刪除的文件夾沒有重要內(nèi)容

拷貝多級目錄

long start = System.currentTimeMillis();
String source = "D:\\Snipaste-1.16.2-x64";
String target = "D:\\Snipaste-1.16.2-x64aaa";

Files.walk(Paths.get(source)).forEach(path -> {
    try {
        String targetName = path.toString().replace(source, target);
        // 是目錄
        if (Files.isDirectory(path)) {
            Files.createDirectory(Paths.get(targetName));
        }
        // 是普通文件
        else if (Files.isRegularFile(path)) {
            Files.copy(path, Paths.get(targetName));
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
});
long end = System.currentTimeMillis();
System.out.println(end - start);

4. 網(wǎng)絡(luò)編程

4.1 非阻塞 vs 阻塞

阻塞
  • 阻塞模式下,相關(guān)方法都會導(dǎo)致線程暫停
    • ServerSocketChannel.accept 會在沒有連接建立時讓線程暫停
    • SocketChannel.read 會在沒有數(shù)據(jù)可讀時讓線程暫停
    • 阻塞的表現(xiàn)其實就是線程暫停了,暫停期間不會占用 cpu,但線程相當(dāng)于閑置
  • 單線程下,阻塞方法之間相互影響,幾乎不能正常工作,需要多線程支持
  • 但多線程下,有新的問題,體現(xiàn)在以下方面
    • 32 位 jvm 一個線程 320k,64 位 jvm 一個線程 1024k,如果連接數(shù)過多,必然導(dǎo)致 OOM,并且線程太多,反而會因為頻繁上下文切換導(dǎo)致性能降低
    • 可以采用線程池技術(shù)來減少線程數(shù)和線程上下文切換,但治標(biāo)不治本,如果有很多連接建立,但長時間 inactive,會阻塞線程池中所有線程,因此不適合長連接,只適合短連接

服務(wù)器端

// 使用 nio 來理解阻塞模式, 單線程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 創(chuàng)建了服務(wù)器
ServerSocketChannel ssc = ServerSocketChannel.open();

// 2. 綁定監(jiān)聽端口
ssc.bind(new InetSocketAddress(8080));

// 3. 連接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
    // 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信
    log.debug("connecting...");
    SocketChannel sc = ssc.accept(); // 阻塞方法,線程停止運行
    log.debug("connected... {}", sc);
    channels.add(sc);
    for (SocketChannel channel : channels) {
        // 5. 接收客戶端發(fā)送的數(shù)據(jù)
        log.debug("before read... {}", channel);
        channel.read(buffer); // 阻塞方法,線程停止運行
        buffer.flip();
        debugRead(buffer);
        buffer.clear();
        log.debug("after read...{}", channel);
    }
}

客戶端

SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting...");
非阻塞
  • 非阻塞模式下,相關(guān)方法都會不會讓線程暫停
    • 在 ServerSocketChannel.accept 在沒有連接建立時,會返回 null,繼續(xù)運行
    • SocketChannel.read 在沒有數(shù)據(jù)可讀時,會返回 0,但線程不必阻塞,可以去執(zhí)行其它 SocketChannel 的 read 或是去執(zhí)行 ServerSocketChannel.accept
    • 寫數(shù)據(jù)時,線程只是等待數(shù)據(jù)寫入 Channel 即可,無需等 Channel 通過網(wǎng)絡(luò)把數(shù)據(jù)發(fā)送出去
  • 但非阻塞模式下,即使沒有連接建立,和可讀數(shù)據(jù),線程仍然在不斷運行,白白浪費了 cpu
  • 數(shù)據(jù)復(fù)制過程中,線程實際還是阻塞的(AIO 改進的地方)

服務(wù)器端,客戶端代碼不變

// 使用 nio 來理解非阻塞模式, 單線程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 創(chuàng)建了服務(wù)器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 綁定監(jiān)聽端口
ssc.bind(new InetSocketAddress(8080));
// 3. 連接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
    // 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信
    SocketChannel sc = ssc.accept(); // 非阻塞,線程還會繼續(xù)運行,如果沒有連接建立,但sc是null
    if (sc != null) {
        log.debug("connected... {}", sc);
        sc.configureBlocking(false); // 非阻塞模式
        channels.add(sc);
    }
    for (SocketChannel channel : channels) {
        // 5. 接收客戶端發(fā)送的數(shù)據(jù)
        int read = channel.read(buffer);// 非阻塞,線程仍然會繼續(xù)運行,如果沒有讀到數(shù)據(jù),read 返回 0
        if (read > 0) {
            buffer.flip();
            debugRead(buffer);
            buffer.clear();
            log.debug("after read...{}", channel);
        }
    }
}
多路復(fù)用

單線程可以配合 Selector 完成對多個 Channel 可讀寫事件的監(jiān)控,這稱之為多路復(fù)用

  • 多路復(fù)用僅針對網(wǎng)絡(luò) IO、普通文件 IO 沒法利用多路復(fù)用
  • 如果不用 Selector 的非阻塞模式,線程大部分時間都在做無用功,而 Selector 能夠保證
    • 有可連接事件時才去連接
    • 有可讀事件才去讀取
    • 有可寫事件才去寫入
      • 限于網(wǎng)絡(luò)傳輸能力,Channel 未必時時可寫,一旦 Channel 可寫,會觸發(fā) Selector 的可寫事件

4.2 Selector

好處

  • 一個線程配合 selector 就可以監(jiān)控多個 channel 的事件,事件發(fā)生線程才去處理。避免非阻塞模式下所做無用功
  • 讓這個線程能夠被充分利用
  • 節(jié)約了線程的數(shù)量
  • 減少了線程上下文切換
創(chuàng)建
Selector selector = Selector.open();
綁定 Channel 事件

也稱之為注冊事件,綁定的事件 selector 才會關(guān)心

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 綁定事件);
  • channel 必須工作在非阻塞模式
  • FileChannel 沒有非阻塞模式,因此不能配合 selector 一起使用
  • 綁定的事件類型可以有
    • connect - 客戶端連接成功時觸發(fā)
    • accept - 服務(wù)器端成功接受連接時觸發(fā)
    • read - 數(shù)據(jù)可讀入時觸發(fā),有因為接收能力弱,數(shù)據(jù)暫不能讀入的情況
    • write - 數(shù)據(jù)可寫出時觸發(fā),有因為發(fā)送能力弱,數(shù)據(jù)暫不能寫出的情況
監(jiān)聽 Channel 事件

可以通過下面三種方法來監(jiān)聽是否有事件發(fā)生,方法的返回值代表有多少 channel 發(fā)生了事件

方法1,阻塞直到綁定事件發(fā)生

int count = selector.select();

方法2,阻塞直到綁定事件發(fā)生,或是超時(時間單位為 ms)

int count = selector.select(long timeout);

方法3,不會阻塞,也就是不管有沒有事件,立刻返回,自己根據(jù)返回值檢查是否有事件

int count = selector.selectNow();
?? select 何時不阻塞
  • 事件發(fā)生時
    • 客戶端發(fā)起連接請求,會觸發(fā) accept 事件
    • 客戶端發(fā)送數(shù)據(jù)過來,客戶端正常、異常關(guān)閉時,都會觸發(fā) read 事件,另外如果發(fā)送的數(shù)據(jù)大于 buffer 緩沖區(qū),會觸發(fā)多次讀取事件
    • channel 可寫,會觸發(fā) write 事件
    • 在 linux 下 nio bug 發(fā)生時
  • 調(diào)用 selector.wakeup()
  • 調(diào)用 selector.close()
  • selector 所在線程 interrupt

4.3 處理 accept 事件

客戶端代碼為

public class Client {
    public static void main(String[] args) {
        try (Socket socket = new Socket("localhost", 8080)) {
            System.out.println(socket);
            socket.getOutputStream().write("world".getBytes());
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

服務(wù)器端代碼為

@Slf4j
public class ChannelDemo6 {
    public static void main(String[] args) {
        try (ServerSocketChannel channel = ServerSocketChannel.open()) {
            channel.bind(new InetSocketAddress(8080));
            System.out.println(channel);
            Selector selector = Selector.open();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                int count = selector.select();
//                int count = selector.selectNow();
                log.debug("select count: {}", count);
//                if(count <= 0) {
//                    continue;
//                }

                // 獲取所有事件
                Set<SelectionKey> keys = selector.selectedKeys();

                // 遍歷所有事件,逐一處理
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    // 判斷事件類型
                    if (key.isAcceptable()) {
                        ServerSocketChannel c = (ServerSocketChannel) key.channel();
                        // 必須處理
                        SocketChannel sc = c.accept();
                        log.debug("{}", sc);
                    }
                    // 處理完畢,必須將事件移除
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
?? 事件發(fā)生后能否不處理

事件發(fā)生后,要么處理,要么取消(cancel),不能什么都不做,否則下次該事件仍會觸發(fā),這是因為 nio 底層使用的是水平觸發(fā)

4.4 處理 read 事件

@Slf4j
public class ChannelDemo6 {
    public static void main(String[] args) {
        try (ServerSocketChannel channel = ServerSocketChannel.open()) {
            channel.bind(new InetSocketAddress(8080));
            System.out.println(channel);
            Selector selector = Selector.open();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                int count = selector.select();
//                int count = selector.selectNow();
                log.debug("select count: {}", count);
//                if(count <= 0) {
//                    continue;
//                }

                // 獲取所有事件
                Set<SelectionKey> keys = selector.selectedKeys();

                // 遍歷所有事件,逐一處理
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    // 判斷事件類型
                    if (key.isAcceptable()) {
                        ServerSocketChannel c = (ServerSocketChannel) key.channel();
                        // 必須處理
                        SocketChannel sc = c.accept();
                        sc.configureBlocking(false);
                        sc.register(selector, SelectionKey.OP_READ);
                        log.debug("連接已建立: {}", sc);
                    } else if (key.isReadable()) {
                        SocketChannel sc = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(128);
                        int read = sc.read(buffer);
                        if(read == -1) {
                            key.cancel();
                            sc.close();
                        } else {
                            buffer.flip();
                            debug(buffer);
                        }
                    }
                    // 處理完畢,必須將事件移除
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

開啟兩個客戶端,修改一下發(fā)送文字,輸出

sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - 連接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60367]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - 連接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60378]
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 6f 72 6c 64                                  |world           |
+--------+-------------------------------------------------+----------------+

?? 為何要 iter.remove()

因為 select 在事件發(fā)生后,就會將相關(guān)的 key 放入 selectedKeys 集合,但不會在處理完后從 selectedKeys 集合中移除,需要我們自己編碼刪除。例如

  • 第一次觸發(fā)了 ssckey 上的 accept 事件,沒有移除 ssckey
  • 第二次觸發(fā)了 sckey 上的 read 事件,但這時 selectedKeys 中還有上次的 ssckey ,在處理時因為沒有真正的 serverSocket 連上了,就會導(dǎo)致空指針異常
?? cancel 的作用

cancel 會取消注冊在 selector 上的 channel,并從 keys 集合中刪除 key 后續(xù)不會再監(jiān)聽事件

?? 不處理邊界的問題

以前有同學(xué)寫過這樣的代碼,思考注釋中兩個問題,以 bio 為例,其實 nio 道理是一樣的

public class Server {
    public static void main(String[] args) throws IOException {
        ServerSocket ss=new ServerSocket(9000);
        while (true) {
            Socket s = ss.accept();
            InputStream in = s.getInputStream();
            // 這里這么寫,有沒有問題
            byte[] arr = new byte[4];
            while(true) {
                int read = in.read(arr);
                // 這里這么寫,有沒有問題
                if(read == -1) {
                    break;
                }
                System.out.println(new String(arr, 0, read));
            }
        }
    }
}

客戶端

public class Client {
    public static void main(String[] args) throws IOException {
        Socket max = new Socket("localhost", 9000);
        OutputStream out = max.getOutputStream();
        out.write("hello".getBytes());
        out.write("world".getBytes());
        out.write("你好".getBytes());
        max.close();
    }
}

輸出

hell
owor
ld?
?好

為什么?

處理消息的邊界

NIO基礎(chǔ) - 網(wǎng)絡(luò)編程

  • 一種思路是固定消息長度,數(shù)據(jù)包大小一樣,服務(wù)器按預(yù)定長度讀取,缺點是浪費帶寬
  • 另一種思路是按分隔符拆分,缺點是效率低
  • TLV 格式,即 Type 類型、Length 長度、Value 數(shù)據(jù),類型和長度已知的情況下,就可以方便獲取消息大小,分配合適的 buffer,缺點是 buffer 需要提前分配,如果內(nèi)容過大,則影響 server 吞吐量
    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式

服務(wù)器端

private static void split(ByteBuffer source) {
    source.flip();
    for (int i = 0; i < source.limit(); i++) {
        // 找到一條完整消息
        if (source.get(i) == '\n') {
            int length = i + 1 - source.position();
            // 把這條完整消息存入新的 ByteBuffer
            ByteBuffer target = ByteBuffer.allocate(length);
            // 從 source 讀,向 target 寫
            for (int j = 0; j < length; j++) {
                target.put(source.get());
            }
            debugAll(target);
        }
    }
    source.compact(); // 0123456789abcdef  position 16 limit 16
}

public static void main(String[] args) throws IOException {
    // 1. 創(chuàng)建 selector, 管理多個 channel
    Selector selector = Selector.open();
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    // 2. 建立 selector 和 channel 的聯(lián)系(注冊)
    // SelectionKey 就是將來事件發(fā)生后,通過它可以知道事件和哪個channel的事件
    SelectionKey sscKey = ssc.register(selector, 0, null);
    // key 只關(guān)注 accept 事件
    sscKey.interestOps(SelectionKey.OP_ACCEPT);
    log.debug("sscKey:{}", sscKey);
    ssc.bind(new InetSocketAddress(8080));
    while (true) {
        // 3. select 方法, 沒有事件發(fā)生,線程阻塞,有事件,線程才會恢復(fù)運行
        // select 在事件未處理時,它不會阻塞, 事件發(fā)生后要么處理,要么取消,不能置之不理
        selector.select();
        // 4. 處理事件, selectedKeys 內(nèi)部包含了所有發(fā)生的事件
        Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
        while (iter.hasNext()) {
            SelectionKey key = iter.next();
            // 處理key 時,要從 selectedKeys 集合中刪除,否則下次處理就會有問題
            iter.remove();
            log.debug("key: {}", key);
            // 5. 區(qū)分事件類型
            if (key.isAcceptable()) { // 如果是 accept
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                SocketChannel sc = channel.accept();
                sc.configureBlocking(false);
                ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
                // 將一個 byteBuffer 作為附件關(guān)聯(lián)到 selectionKey 上
                SelectionKey scKey = sc.register(selector, 0, buffer);
                scKey.interestOps(SelectionKey.OP_READ);
                log.debug("{}", sc);
                log.debug("scKey:{}", scKey);
            } else if (key.isReadable()) { // 如果是 read
                try {
                    SocketChannel channel = (SocketChannel) key.channel(); // 拿到觸發(fā)事件的channel
                    // 獲取 selectionKey 上關(guān)聯(lián)的附件
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    int read = channel.read(buffer); // 如果是正常斷開,read 的方法的返回值是 -1
                    if(read == -1) {
                        key.cancel();
                    } else {
                        split(buffer);
                        // 需要擴容
                        if (buffer.position() == buffer.limit()) {
                            ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                            buffer.flip();
                            newBuffer.put(buffer); // 0123456789abcdef3333\n
                            key.attach(newBuffer);
                        }
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                    key.cancel();  // 因為客戶端斷開了,因此需要將 key 取消(從 selector 的 keys 集合中真正刪除 key)
                }
            }
        }
    }
}

客戶端

SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress address = sc.getLocalAddress();
// sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
System.in.read();
ByteBuffer 大小分配
  • 每個 channel 都需要記錄可能被切分的消息,因為 ByteBuffer 不能被多個 channel 共同使用,因此需要為每個 channel 維護一個獨立的 ByteBuffer
  • ByteBuffer 不能太大,比如一個 ByteBuffer 1Mb 的話,要支持百萬連接就要 1Tb 內(nèi)存,因此需要設(shè)計大小可變的 ByteBuffer
    • 一種思路是首先分配一個較小的 buffer,例如 4k,如果發(fā)現(xiàn)數(shù)據(jù)不夠,再分配 8k 的 buffer,將 4k buffer 內(nèi)容拷貝至 8k buffer,優(yōu)點是消息連續(xù)容易處理,缺點是數(shù)據(jù)拷貝耗費性能,參考實現(xiàn) http://tutorials.jenkov.com/java-performance/resizable-array.html
    • 另一種思路是用多個數(shù)組組成 buffer,一個數(shù)組不夠,把多出來的內(nèi)容寫入新的數(shù)組,與前面的區(qū)別是消息存儲不連續(xù)解析復(fù)雜,優(yōu)點是避免了拷貝引起的性能損耗

4.5 處理 write 事件

一次無法寫完例子
  • 非阻塞模式下,無法保證把 buffer 中所有數(shù)據(jù)都寫入 channel,因此需要追蹤 write 方法的返回值(代表實際寫入字節(jié)數(shù))
  • 用 selector 監(jiān)聽所有 channel 的可寫事件,每個 channel 都需要一個 key 來跟蹤 buffer,但這樣又會導(dǎo)致占用內(nèi)存過多,就有兩階段策略
    • 當(dāng)消息處理器第一次寫入消息時,才將 channel 注冊到 selector 上
    • selector 檢查 channel 上的可寫事件,如果所有的數(shù)據(jù)寫完了,就取消 channel 的注冊
    • 如果不取消,會每次可寫均會觸發(fā) write 事件
public class WriteServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));

        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while(true) {
            selector.select();

            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
                    // 1. 向客戶端發(fā)送內(nèi)容
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 3000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    int write = sc.write(buffer);
                    // 3. write 表示實際寫了多少字節(jié)
                    System.out.println("實際寫入字節(jié):" + write);
                    // 4. 如果有剩余未讀字節(jié),才需要關(guān)注寫事件
                    if (buffer.hasRemaining()) {
                        // read 1  write 4
                        // 在原有關(guān)注事件的基礎(chǔ)上,多關(guān)注 寫事件
                        sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
                        // 把 buffer 作為附件加入 sckey
                        sckey.attach(buffer);
                    }
                } else if (key.isWritable()) {
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    SocketChannel sc = (SocketChannel) key.channel();
                    int write = sc.write(buffer);
                    System.out.println("實際寫入字節(jié):" + write);
                    if (!buffer.hasRemaining()) { // 寫完了
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                        key.attach(null);
                    }
                }
            }
        }
    }
}

客戶端

public class WriteClient {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);
        sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
        sc.connect(new InetSocketAddress("localhost", 8080));
        int count = 0;
        while (true) {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isConnectable()) {
                    System.out.println(sc.finishConnect());
                } else if (key.isReadable()) {
                    ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
                    count += sc.read(buffer);
                    buffer.clear();
                    System.out.println(count);
                }
            }
        }
    }
}
?? write 為何要取消

只要向 channel 發(fā)送數(shù)據(jù)時,socket 緩沖可寫,這個事件會頻繁觸發(fā),因此應(yīng)當(dāng)只在 socket 緩沖區(qū)寫不下時再關(guān)注可寫事件,數(shù)據(jù)寫完之后再取消關(guān)注

4.6 更進一步

?? 利用多線程優(yōu)化

現(xiàn)在都是多核 cpu,設(shè)計時要充分考慮別讓 cpu 的力量被白白浪費

前面的代碼只有一個選擇器,沒有充分利用多核 cpu,如何改進呢?

分兩組選擇器

  • 單線程配一個選擇器,專門處理 accept 事件
  • 創(chuàng)建 cpu 核心數(shù)的線程,每個線程配一個選擇器,輪流處理 read 事件
public class ChannelDemo7 {
    public static void main(String[] args) throws IOException {
        new BossEventLoop().register();
    }


    @Slf4j
    static class BossEventLoop implements Runnable {
        private Selector boss;
        private WorkerEventLoop[] workers;
        private volatile boolean start = false;
        AtomicInteger index = new AtomicInteger();

        public void register() throws IOException {
            if (!start) {
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.bind(new InetSocketAddress(8080));
                ssc.configureBlocking(false);
                boss = Selector.open();
                SelectionKey ssckey = ssc.register(boss, 0, null);
                ssckey.interestOps(SelectionKey.OP_ACCEPT);
                workers = initEventLoops();
                new Thread(this, "boss").start();
                log.debug("boss start...");
                start = true;
            }
        }

        public WorkerEventLoop[] initEventLoops() {
//        EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];
            WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];
            for (int i = 0; i < workerEventLoops.length; i++) {
                workerEventLoops[i] = new WorkerEventLoop(i);
            }
            return workerEventLoops;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    boss.select();
                    Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isAcceptable()) {
                            ServerSocketChannel c = (ServerSocketChannel) key.channel();
                            SocketChannel sc = c.accept();
                            sc.configureBlocking(false);
                            log.debug("{} connected", sc.getRemoteAddress());
                            workers[index.getAndIncrement() % workers.length].register(sc);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Slf4j
    static class WorkerEventLoop implements Runnable {
        private Selector worker;
        private volatile boolean start = false;
        private int index;

        private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();

        public WorkerEventLoop(int index) {
            this.index = index;
        }

        public void register(SocketChannel sc) throws IOException {
            if (!start) {
                worker = Selector.open();
                new Thread(this, "worker-" + index).start();
                start = true;
            }
            tasks.add(() -> {
                try {
                    SelectionKey sckey = sc.register(worker, 0, null);
                    sckey.interestOps(SelectionKey.OP_READ);
                    worker.selectNow();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            worker.wakeup();
        }

        @Override
        public void run() {
            while (true) {
                try {
                    worker.select();
                    Runnable task = tasks.poll();
                    if (task != null) {
                        task.run();
                    }
                    Set<SelectionKey> keys = worker.selectedKeys();
                    Iterator<SelectionKey> iter = keys.iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        if (key.isReadable()) {
                            SocketChannel sc = (SocketChannel) key.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(128);
                            try {
                                int read = sc.read(buffer);
                                if (read == -1) {
                                    key.cancel();
                                    sc.close();
                                } else {
                                    buffer.flip();
                                    log.debug("{} message:", sc.getRemoteAddress());
                                    debugAll(buffer);
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                                key.cancel();
                                sc.close();
                            }
                        }
                        iter.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
?? 如何拿到 cpu 個數(shù)
  • Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因為容器不是物理隔離的,會拿到物理 cpu 個數(shù),而不是容器申請時的個數(shù)
  • 這個問題直到 jdk 10 才修復(fù),使用 jvm 參數(shù) UseContainerSupport 配置, 默認(rèn)開啟

4.7 UDP

  • UDP 是無連接的,client 發(fā)送數(shù)據(jù)不會管 server 是否開啟
  • server 這邊的 receive 方法會將接收到的數(shù)據(jù)存入 byte buffer,但如果數(shù)據(jù)報文超過 buffer 大小,多出來的數(shù)據(jù)會被默默拋棄

首先啟動服務(wù)器端

public class UdpServer {
    public static void main(String[] args) {
        try (DatagramChannel channel = DatagramChannel.open()) {
            channel.socket().bind(new InetSocketAddress(9999));
            System.out.println("waiting...");
            ByteBuffer buffer = ByteBuffer.allocate(32);
            channel.receive(buffer);
            buffer.flip();
            debug(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

輸出

waiting...

運行客戶端

public class UdpClient {
    public static void main(String[] args) {
        try (DatagramChannel channel = DatagramChannel.open()) {
            ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
            InetSocketAddress address = new InetSocketAddress("localhost", 9999);
            channel.send(buffer, address);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

接下來服務(wù)器端輸出文章來源地址http://www.zghlxwxcb.cn/news/detail-432170.html

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+

到了這里,關(guān)于NIO基礎(chǔ) - 網(wǎng)絡(luò)編程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • NIO-Selector 網(wǎng)絡(luò)編程

    NIO-Selector 網(wǎng)絡(luò)編程

    目錄 一、阻塞 非阻塞 1、阻塞 2、非阻塞 二、selector 1、連接和讀取 2、處理客戶端斷開 3、處理消息的邊界 4、ByteBuffer大小分配 三、多線程優(yōu)化 四、NIO vs BIO 1、stream vs channnel 2、IO模型 阻塞IO 非阻塞IO 多路復(fù)用 異步IO模型 服務(wù)器端的代碼 然后創(chuàng)建客戶端,直接連服務(wù)器端的

    2024年02月12日
    瀏覽(16)
  • Java 網(wǎng)絡(luò)編程之NIO(selector)

    Java 網(wǎng)絡(luò)編程之NIO(selector)

    ? ? ? ? ? ? ? ?? ? ?本編文章意在循環(huán)漸進,可看最后一個就可以了 ? ? ? ? ? ? ? ?Selector selector = Selector.open(); ? ? ? ? ? ? ? ?首先channel必須是非阻塞的情況下 ? ? ? ? ? ? ? ? channel.register(選擇器,操作的類型,綁定的組件);返回的是選擇鍵? ? ? ? ? ? ? 1)Ch

    2023年04月11日
    瀏覽(22)
  • Java網(wǎng)絡(luò)編程-深入理解BIO、NIO

    Java網(wǎng)絡(luò)編程-深入理解BIO、NIO

    BIO BIO 為 Blocked-IO(阻塞 IO),在 JDK1.4 之前建立網(wǎng)絡(luò)連接時,只能使用 BIO 使用 BIO 時,服務(wù)端會對客戶端的每個請求都建立一個線程進行處理,客戶端向服務(wù)端發(fā)送請求后,先咨詢服務(wù)端是否有線程響應(yīng),如果沒有就會等待或者被拒絕 BIO 基本使用代碼: 服務(wù)端: 客戶端:

    2024年02月04日
    瀏覽(24)
  • 10.NIO 網(wǎng)絡(luò)編程應(yīng)用實例-群聊系統(tǒng)

    需求:進一步理解 NIO 非阻塞網(wǎng)絡(luò)編程機制,實現(xiàn)多人群聊 編寫一個 NIO 群聊系統(tǒng),實現(xiàn)客戶端與客戶端的通信需求(非阻塞) 服務(wù)器端:可以監(jiān)測用戶上線,離線,并實現(xiàn)消息轉(zhuǎn)發(fā)功能 客戶端:通過 channel 可以無阻塞發(fā)送消息給其它所有客戶端用戶,同時可以接受其它客戶端用

    2024年02月15日
    瀏覽(44)
  • BIO、NIO、IO多路復(fù)用模型詳細(xì)介紹&Java NIO 網(wǎng)絡(luò)編程

    BIO、NIO、IO多路復(fù)用模型詳細(xì)介紹&Java NIO 網(wǎng)絡(luò)編程

    上文介紹了網(wǎng)絡(luò)編程的基礎(chǔ)知識,并基于 Java 編寫了 BIO 的網(wǎng)絡(luò)編程。我們知道 BIO 模型是存在巨大問題的,比如 C10K 問題,其本質(zhì)就是因其阻塞原因,導(dǎo)致如果想要承受更多的請求就必須有足夠多的線程,但是足夠多的線程會帶來內(nèi)存占用問題、CPU上下文切換帶來的性能問題

    2024年02月14日
    瀏覽(30)
  • Java網(wǎng)絡(luò)編程----通過實現(xiàn)簡易聊天工具來聊聊NIO

    Java網(wǎng)絡(luò)編程----通過實現(xiàn)簡易聊天工具來聊聊NIO

    前文我們說過了BIO,今天我們聊聊NIO。 NIO 是什么?NIO官方解釋它為 New lO ,由于其特性我們也稱之為,Non-Blocking IO。這是jdk1.4之后新增的一套IO標(biāo)準(zhǔn)。 為什么要用NIO呢? 我們再簡單回顧下BIO: 阻塞式IO,原理很簡單,其實就是多個端點與服務(wù)端進行通信時,每個客戶端有一個

    2024年02月05日
    瀏覽(20)
  • Java網(wǎng)絡(luò)編程(二)NIO和Netty實現(xiàn)多人聊天功能
  • 【Netty專題】【網(wǎng)絡(luò)編程】從OSI、TCP/IP網(wǎng)絡(luò)模型開始到BIO、NIO(Netty前置知識)

    【Netty專題】【網(wǎng)絡(luò)編程】從OSI、TCP/IP網(wǎng)絡(luò)模型開始到BIO、NIO(Netty前置知識)

    我是有點怕網(wǎng)絡(luò)編程的,總有點【談網(wǎng)色變】的感覺。為了讓自己不再【談網(wǎng)色變】,所以我想過系統(tǒng)學(xué)習(xí)一下,然后再做個筆記這樣,加深一下理解。但是真要系統(tǒng)學(xué)習(xí),其實還是要花費不少時間的,所以這里也只是簡單的,盡可能地覆蓋一下,梳理一些我認(rèn)為比較迫切需

    2024年02月06日
    瀏覽(28)
  • 【網(wǎng)絡(luò)編程】網(wǎng)絡(luò)基礎(chǔ)

    【網(wǎng)絡(luò)編程】網(wǎng)絡(luò)基礎(chǔ)

    需要云服務(wù)器等云產(chǎn)品來學(xué)習(xí)Linux的同學(xué)可以移步/--騰訊云--/--阿里云--/--華為云--/官網(wǎng),輕量型云服務(wù)器低至112元/年,新用戶首次下單享超低折扣。 ? 目錄 一、協(xié)議分層 1、為什么要分層 2、OSI七層模型 3、TCP/IP四層協(xié)議(五層協(xié)議) 二、網(wǎng)絡(luò)傳輸流程 1、同一個網(wǎng)段內(nèi)的兩

    2024年02月02日
    瀏覽(18)
  • 【網(wǎng)絡(luò)編程】Linux網(wǎng)絡(luò)編程基礎(chǔ)與實戰(zhàn)第三彈——網(wǎng)絡(luò)名詞術(shù)語

    數(shù)據(jù)包從源地址到目的地址所經(jīng)過的路徑,由一系列路由節(jié)點組成。 某個路由節(jié)點為數(shù)據(jù)包選擇投遞方向的選路過程。 路由器工作原理 路由器是連接因特網(wǎng)中各局域網(wǎng)、廣域網(wǎng)的設(shè)備,它會根據(jù)信道的情況自動選擇和設(shè)定路由,以最佳路徑,按前后順序發(fā)送信號的設(shè)備。

    2024年02月08日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包