国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

由淺入深Netty基礎(chǔ)知識(shí)NIO網(wǎng)絡(luò)編程

這篇具有很好參考價(jià)值的文章主要介紹了由淺入深Netty基礎(chǔ)知識(shí)NIO網(wǎng)絡(luò)編程。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。


1 非阻塞 vs 阻塞

由淺入深Netty基礎(chǔ)知識(shí)NIO網(wǎng)絡(luò)編程

1.1 阻塞

  • 阻塞模式下,相關(guān)方法都會(huì)導(dǎo)致線程暫停
    • ServerSocketChannel.accept 會(huì)在沒有連接建立時(shí)讓線程暫停
    • SocketChannel.read 會(huì)在沒有數(shù)據(jù)可讀時(shí)讓線程暫停
    • 阻塞的表現(xiàn)其實(shí)就是線程暫停了,暫停期間不會(huì)占用 cpu,但線程相當(dāng)于閑置
  • 單線程下,阻塞方法之間相互影響,幾乎不能正常工作,需要多線程支持
  • 但多線程下,有新的問題,體現(xiàn)在以下方面
    • 32 位 jvm 一個(gè)線程 320k,64 位 jvm 一個(gè)線程 1024k,如果連接數(shù)過多,必然導(dǎo)致 OOM,并且線程太多,反而會(huì)因?yàn)轭l繁上下文切換導(dǎo)致性能降低
    • 可以采用線程池技術(shù)來減少線程數(shù)和線程上下文切換,但治標(biāo)不治本,如果有很多連接建立,但長時(shí)間 inactive,會(huì)阻塞線程池中所有線程,因此不適合長連接,只適合短連接

服務(wù)器端

// 使用 nio 來理解阻塞模式, 單線程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 創(chuàng)建了服務(wù)器
ServerSocketChannel ssc = ServerSocketChannel.open();

// 2. 綁定監(jiān)聽端口
ssc.bind(new InetSocketAddress(8080));

// 3. 連接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
    // 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信
    log.debug("connecting...");
    SocketChannel sc = ssc.accept(); // 阻塞方法,線程停止運(yùn)行
    log.debug("connected... {}", sc);
    channels.add(sc);
    for (SocketChannel channel : channels) {
        // 5. 接收客戶端發(fā)送的數(shù)據(jù)
        log.debug("before read... {}", channel);
        channel.read(buffer); // 阻塞方法,線程停止運(yùn)行
        buffer.flip();
        debugRead(buffer);
        buffer.clear();
        log.debug("after read...{}", channel);
    }
}

客戶端

SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting...");

1.2 非阻塞

  • 非阻塞模式下,相關(guān)方法都會(huì)不會(huì)讓線程暫停
    • 在 ServerSocketChannel.accept 在沒有連接建立時(shí),會(huì)返回 null,繼續(xù)運(yùn)行
    • SocketChannel.read 在沒有數(shù)據(jù)可讀時(shí),會(huì)返回 0,但線程不必阻塞,可以去執(zhí)行其它 SocketChannel 的 read 或是去執(zhí)行 ServerSocketChannel.accept
    • 寫數(shù)據(jù)時(shí),線程只是等待數(shù)據(jù)寫入 Channel 即可,無需等 Channel 通過網(wǎng)絡(luò)把數(shù)據(jù)發(fā)送出去
  • 但非阻塞模式下,即使沒有連接建立,和可讀數(shù)據(jù),線程仍然在不斷運(yùn)行,白白浪費(fèi)了 cpu
  • 數(shù)據(jù)復(fù)制過程中,線程實(shí)際還是阻塞的(AIO 改進(jìn)的地方)

服務(wù)器端,客戶端代碼不變

// 使用 nio 來理解非阻塞模式, 單線程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 創(chuàng)建了服務(wù)器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 綁定監(jiān)聽端口
ssc.bind(new InetSocketAddress(8080));
// 3. 連接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
    // 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信
    SocketChannel sc = ssc.accept(); // 非阻塞,線程還會(huì)繼續(xù)運(yùn)行,如果沒有連接建立,但sc是null
    if (sc != null) {
        log.debug("connected... {}", sc);
        sc.configureBlocking(false); // 非阻塞模式
        channels.add(sc);
    }
    for (SocketChannel channel : channels) {
        // 5. 接收客戶端發(fā)送的數(shù)據(jù)
        int read = channel.read(buffer);// 非阻塞,線程仍然會(huì)繼續(xù)運(yùn)行,如果沒有讀到數(shù)據(jù),read 返回 0
        if (read > 0) {
            buffer.flip();
            debugRead(buffer);
            buffer.clear();
            log.debug("after read...{}", channel);
        }
    }
}

1.3 多路復(fù)用

單線程可以配合 Selector 完成對多個(gè) Channel 可讀寫事件的監(jiān)控,這稱之為多路復(fù)用

  • 多路復(fù)用僅針對網(wǎng)絡(luò) IO、普通文件 IO 沒法利用多路復(fù)用
  • 如果不用 Selector 的非阻塞模式,線程大部分時(shí)間都在做無用功,而 Selector 能夠保證
    • 有可連接事件時(shí)才去連接
    • 有可讀事件才去讀取
    • 有可寫事件才去寫入
      • 限于網(wǎng)絡(luò)傳輸能力,Channel 未必時(shí)時(shí)可寫,一旦 Channel 可寫,會(huì)觸發(fā) Selector 的可寫事件

2 Selector

好處

  • 一個(gè)線程配合 selector 就可以監(jiān)控多個(gè) channel 的事件,事件發(fā)生線程才去處理。避免非阻塞模式下所做無用功
  • 讓這個(gè)線程能夠被充分利用
  • 節(jié)約了線程的數(shù)量
  • 減少了線程上下文切換

2.1 創(chuàng)建

Selector selector = Selector.open();

2.2 綁定 Channel 事件

也稱之為注冊事件,綁定的事件 selector 才會(huì)關(guān)心

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 綁定事件);
  • channel 必須工作在非阻塞模式
  • FileChannel 沒有非阻塞模式,因此不能配合 selector 一起使用
  • 綁定的事件類型可以有
    • connect - 客戶端連接成功時(shí)觸發(fā)
    • accept - 服務(wù)器端成功接受連接時(shí)觸發(fā)
    • read - 數(shù)據(jù)可讀入時(shí)觸發(fā),有因?yàn)榻邮漳芰θ酰瑪?shù)據(jù)暫不能讀入的情況
    • write - 數(shù)據(jù)可寫出時(shí)觸發(fā),有因?yàn)榘l(fā)送能力弱,數(shù)據(jù)暫不能寫出的情況

2.3 監(jiān)聽 Channel 事件

可以通過下面三種方法來監(jiān)聽是否有事件發(fā)生,方法的返回值代表有多少 channel 發(fā)生了事件

方法1,阻塞直到綁定事件發(fā)生

int count = selector.select();

方法2,阻塞直到綁定事件發(fā)生,或是超時(shí)(時(shí)間單位為 ms)

int count = selector.select(long timeout);

方法3,不會(huì)阻塞,也就是不管有沒有事件,立刻返回,自己根據(jù)返回值檢查是否有事件

int count = selector.selectNow();

2.4 select 何時(shí)不阻塞

  • 事件發(fā)生時(shí)
    • 客戶端發(fā)起連接請求,會(huì)觸發(fā) accept 事件
    • 客戶端發(fā)送數(shù)據(jù)過來,客戶端正常、異常關(guān)閉時(shí),都會(huì)觸發(fā) read 事件,另外如果發(fā)送的數(shù)據(jù)大于 buffer 緩沖區(qū),會(huì)觸發(fā)多次讀取事件
    • channel 可寫,會(huì)觸發(fā) write 事件
    • 在 linux 下 nio bug 發(fā)生時(shí)
  • 調(diào)用 selector.wakeup()
  • 調(diào)用 selector.close()
  • selector 所在線程 interrupt

3 處理 accept 事件

客戶端代碼為

public class Client {
    public static void main(String[] args) {
        try (Socket socket = new Socket("localhost", 8080)) {
            System.out.println(socket);
            socket.getOutputStream().write("world".getBytes());
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

服務(wù)器端代碼為

@Slf4j
public class ChannelDemo6 {
    public static void main(String[] args) {
        try (ServerSocketChannel channel = ServerSocketChannel.open()) {
            channel.bind(new InetSocketAddress(8080));
            System.out.println(channel);
            Selector selector = Selector.open();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                int count = selector.select();
//                int count = selector.selectNow();
                log.debug("select count: {}", count);
//                if(count <= 0) {
//                    continue;
//                }

                // 獲取所有事件
                Set<SelectionKey> keys = selector.selectedKeys();

                // 遍歷所有事件,逐一處理
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    // 判斷事件類型
                    if (key.isAcceptable()) {
                        ServerSocketChannel c = (ServerSocketChannel) key.channel();
                        // 必須處理
                        SocketChannel sc = c.accept();
                        log.debug("{}", sc);
                    }
                    // 處理完畢,必須將事件移除
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

3.1 事件發(fā)生后能否不處理

事件發(fā)生后,要么處理,要么取消(cancel),不能什么都不做,否則下次該事件仍會(huì)觸發(fā),這是因?yàn)?nio 底層使用的是水平觸發(fā)

4 處理 read 事件

@Slf4j
public class ChannelDemo6 {
    public static void main(String[] args) {
        try (ServerSocketChannel channel = ServerSocketChannel.open()) {
            channel.bind(new InetSocketAddress(8080));
            System.out.println(channel);
            Selector selector = Selector.open();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                int count = selector.select();
//                int count = selector.selectNow();
                log.debug("select count: {}", count);
//                if(count <= 0) {
//                    continue;
//                }

                // 獲取所有事件
                Set<SelectionKey> keys = selector.selectedKeys();

                // 遍歷所有事件,逐一處理
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    // 判斷事件類型
                    if (key.isAcceptable()) {
                        ServerSocketChannel c = (ServerSocketChannel) key.channel();
                        // 必須處理
                        SocketChannel sc = c.accept();
                        sc.configureBlocking(false);
                        sc.register(selector, SelectionKey.OP_READ);
                        log.debug("連接已建立: {}", sc);
                    } else if (key.isReadable()) {
                        SocketChannel sc = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(128);
                        int read = sc.read(buffer);
                        if(read == -1) {
                            key.cancel();
                            sc.close();
                        } else {
                            buffer.flip();
                            debug(buffer);
                        }
                    }
                    // 處理完畢,必須將事件移除
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

開啟兩個(gè)客戶端,修改一下發(fā)送文字,輸出

sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - 連接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60367]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - 連接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60378]
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 6f 72 6c 64                                  |world           |
+--------+-------------------------------------------------+----------------+

4.1 為何要 iter.remove()

因?yàn)?select 在事件發(fā)生后,就會(huì)將相關(guān)的 key 放入 selectedKeys 集合,但不會(huì)在處理完后從 selectedKeys 集合中移除,需要我們自己編碼刪除。例如

  • 第一次觸發(fā)了 ssckey 上的 accept 事件,沒有移除 ssckey
  • 第二次觸發(fā)了 sckey 上的 read 事件,但這時(shí) selectedKeys 中還有上次的 ssckey ,在處理時(shí)因?yàn)闆]有真正的 serverSocket 連上了,就會(huì)導(dǎo)致空指針異常

4.2 cancel 的作用

cancel 會(huì)取消注冊在 selector 上的 channel,并從 keys 集合中刪除 key 后續(xù)不會(huì)再監(jiān)聽事件

4.3 不處理邊界的問題

以前有同學(xué)寫過這樣的代碼,思考注釋中兩個(gè)問題,以 bio 為例,其實(shí) nio 道理是一樣的

public class Server {
    public static void main(String[] args) throws IOException {
        ServerSocket ss=new ServerSocket(9000);
        while (true) {
            Socket s = ss.accept();
            InputStream in = s.getInputStream();
            // 這里這么寫,有沒有問題
            byte[] arr = new byte[4];
            while(true) {
                int read = in.read(arr);
                // 這里這么寫,有沒有問題
                if(read == -1) {
                    break;
                }
                System.out.println(new String(arr, 0, read));
            }
        }
    }
}

客戶端

public class Client {
    public static void main(String[] args) throws IOException {
        Socket max = new Socket("localhost", 9000);
        OutputStream out = max.getOutputStream();
        out.write("hello".getBytes());
        out.write("world".getBytes());
        out.write("你好".getBytes());
        max.close();
    }
}

輸出

hell
owor
ld?
?好

為什么?

4.4 處理消息的邊界

由淺入深Netty基礎(chǔ)知識(shí)NIO網(wǎng)絡(luò)編程

  • 一種思路是固定消息長度,數(shù)據(jù)包大小一樣,服務(wù)器按預(yù)定長度讀取,缺點(diǎn)是浪費(fèi)帶寬
  • 另一種思路是按分隔符拆分,缺點(diǎn)是效率低
  • TLV 格式,即 Type 類型、Length 長度、Value 數(shù)據(jù),類型和長度已知的情況下,就可以方便獲取消息大小,分配合適的 buffer,缺點(diǎn)是 buffer 需要提前分配,如果內(nèi)容過大,則影響 server 吞吐量
    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式

服務(wù)器端

private static void split(ByteBuffer source) {
    source.flip();
    for (int i = 0; i < source.limit(); i++) {
        // 找到一條完整消息
        if (source.get(i) == '\n') {
            int length = i + 1 - source.position();
            // 把這條完整消息存入新的 ByteBuffer
            ByteBuffer target = ByteBuffer.allocate(length);
            // 從 source 讀,向 target 寫
            for (int j = 0; j < length; j++) {
                target.put(source.get());
            }
            debugAll(target);
        }
    }
    source.compact(); // 0123456789abcdef  position 16 limit 16
}

public static void main(String[] args) throws IOException {
    // 1. 創(chuàng)建 selector, 管理多個(gè) channel
    Selector selector = Selector.open();
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    // 2. 建立 selector 和 channel 的聯(lián)系(注冊)
    // SelectionKey 就是將來事件發(fā)生后,通過它可以知道事件和哪個(gè)channel的事件
    SelectionKey sscKey = ssc.register(selector, 0, null);
    // key 只關(guān)注 accept 事件
    sscKey.interestOps(SelectionKey.OP_ACCEPT);
    log.debug("sscKey:{}", sscKey);
    ssc.bind(new InetSocketAddress(8080));
    while (true) {
        // 3. select 方法, 沒有事件發(fā)生,線程阻塞,有事件,線程才會(huì)恢復(fù)運(yùn)行
        // select 在事件未處理時(shí),它不會(huì)阻塞, 事件發(fā)生后要么處理,要么取消,不能置之不理
        selector.select();
        // 4. 處理事件, selectedKeys 內(nèi)部包含了所有發(fā)生的事件
        Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
        while (iter.hasNext()) {
            SelectionKey key = iter.next();
            // 處理key 時(shí),要從 selectedKeys 集合中刪除,否則下次處理就會(huì)有問題
            iter.remove();
            log.debug("key: {}", key);
            // 5. 區(qū)分事件類型
            if (key.isAcceptable()) { // 如果是 accept
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                SocketChannel sc = channel.accept();
                sc.configureBlocking(false);
                ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
                // 將一個(gè) byteBuffer 作為附件關(guān)聯(lián)到 selectionKey 上
                SelectionKey scKey = sc.register(selector, 0, buffer);
                scKey.interestOps(SelectionKey.OP_READ);
                log.debug("{}", sc);
                log.debug("scKey:{}", scKey);
            } else if (key.isReadable()) { // 如果是 read
                try {
                    SocketChannel channel = (SocketChannel) key.channel(); // 拿到觸發(fā)事件的channel
                    // 獲取 selectionKey 上關(guān)聯(lián)的附件
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    int read = channel.read(buffer); // 如果是正常斷開,read 的方法的返回值是 -1
                    if(read == -1) {
                        key.cancel();
                    } else {
                        split(buffer);
                        // 需要擴(kuò)容
                        if (buffer.position() == buffer.limit()) {
                            ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                            buffer.flip();
                            newBuffer.put(buffer); // 0123456789abcdef3333\n
                            key.attach(newBuffer);
                        }
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                    key.cancel();  // 因?yàn)榭蛻舳藬嚅_了,因此需要將 key 取消(從 selector 的 keys 集合中真正刪除 key)
                }
            }
        }
    }
}

客戶端

SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress address = sc.getLocalAddress();
// sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
System.in.read();

4.5 ByteBuffer 大小分配

  • 每個(gè) channel 都需要記錄可能被切分的消息,因?yàn)?ByteBuffer 不能被多個(gè) channel 共同使用,因此需要為每個(gè) channel 維護(hù)一個(gè)獨(dú)立的 ByteBuffer
  • ByteBuffer 不能太大,比如一個(gè) ByteBuffer 1Mb 的話,要支持百萬連接就要 1Tb 內(nèi)存,因此需要設(shè)計(jì)大小可變的 ByteBuffer
    • 一種思路是首先分配一個(gè)較小的 buffer,例如 4k,如果發(fā)現(xiàn)數(shù)據(jù)不夠,再分配 8k 的 buffer,將 4k buffer 內(nèi)容拷貝至 8k buffer,優(yōu)點(diǎn)是消息連續(xù)容易處理,缺點(diǎn)是數(shù)據(jù)拷貝耗費(fèi)性能,參考實(shí)現(xiàn) http://tutorials.jenkov.com/java-performance/resizable-array.html
    • 另一種思路是用多個(gè)數(shù)組組成 buffer,一個(gè)數(shù)組不夠,把多出來的內(nèi)容寫入新的數(shù)組,與前面的區(qū)別是消息存儲(chǔ)不連續(xù)解析復(fù)雜,優(yōu)點(diǎn)是避免了拷貝引起的性能損耗

5 處理 write 事件

5.1 一次無法寫完例子

  • 非阻塞模式下,無法保證把 buffer 中所有數(shù)據(jù)都寫入 channel,因此需要追蹤 write 方法的返回值(代表實(shí)際寫入字節(jié)數(shù))
  • 用 selector 監(jiān)聽所有 channel 的可寫事件,每個(gè) channel 都需要一個(gè) key 來跟蹤 buffer,但這樣又會(huì)導(dǎo)致占用內(nèi)存過多,就有兩階段策略
    • 當(dāng)消息處理器第一次寫入消息時(shí),才將 channel 注冊到 selector 上
    • selector 檢查 channel 上的可寫事件,如果所有的數(shù)據(jù)寫完了,就取消 channel 的注冊
    • 如果不取消,會(huì)每次可寫均會(huì)觸發(fā) write 事件
public class WriteServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));

        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while(true) {
            selector.select();

            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
                    // 1. 向客戶端發(fā)送內(nèi)容
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 3000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    int write = sc.write(buffer);
                    // 3. write 表示實(shí)際寫了多少字節(jié)
                    System.out.println("實(shí)際寫入字節(jié):" + write);
                    // 4. 如果有剩余未讀字節(jié),才需要關(guān)注寫事件
                    if (buffer.hasRemaining()) {
                        // read 1  write 4
                        // 在原有關(guān)注事件的基礎(chǔ)上,多關(guān)注 寫事件
                        sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
                        // 把 buffer 作為附件加入 sckey
                        sckey.attach(buffer);
                    }
                } else if (key.isWritable()) {
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    SocketChannel sc = (SocketChannel) key.channel();
                    int write = sc.write(buffer);
                    System.out.println("實(shí)際寫入字節(jié):" + write);
                    if (!buffer.hasRemaining()) { // 寫完了
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                        key.attach(null);
                    }
                }
            }
        }
    }
}

客戶端

public class WriteClient {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);
        sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
        sc.connect(new InetSocketAddress("localhost", 8080));
        int count = 0;
        while (true) {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isConnectable()) {
                    System.out.println(sc.finishConnect());
                } else if (key.isReadable()) {
                    ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
                    count += sc.read(buffer);
                    buffer.clear();
                    System.out.println(count);
                }
            }
        }
    }
}

5.2 write 為何要取消

只要向 channel 發(fā)送數(shù)據(jù)時(shí),socket 緩沖可寫,這個(gè)事件會(huì)頻繁觸發(fā),因此應(yīng)當(dāng)只在 socket 緩沖區(qū)寫不下時(shí)再關(guān)注可寫事件,數(shù)據(jù)寫完之后再取消關(guān)注

6 更進(jìn)一步

6.1 利用多線程優(yōu)化

現(xiàn)在都是多核 cpu,設(shè)計(jì)時(shí)要充分考慮別讓 cpu 的力量被白白浪費(fèi)

前面的代碼只有一個(gè)選擇器,沒有充分利用多核 cpu,如何改進(jìn)呢?

分兩組選擇器

  • 單線程配一個(gè)選擇器,專門處理 accept 事件
  • 創(chuàng)建 cpu 核心數(shù)的線程,每個(gè)線程配一個(gè)選擇器,輪流處理 read 事件
public class ChannelDemo7 {
    public static void main(String[] args) throws IOException {
        new BossEventLoop().register();
    }


    @Slf4j
    static class BossEventLoop implements Runnable {
        private Selector boss;
        private WorkerEventLoop[] workers;
        private volatile boolean start = false;
        AtomicInteger index = new AtomicInteger();

        public void register() throws IOException {
            if (!start) {
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.bind(new InetSocketAddress(8080));
                ssc.configureBlocking(false);
                boss = Selector.open();
                SelectionKey ssckey = ssc.register(boss, 0, null);
                ssckey.interestOps(SelectionKey.OP_ACCEPT);
                workers = initEventLoops();
                new Thread(this, "boss").start();
                log.debug("boss start...");
                start = true;
            }
        }

        public WorkerEventLoop[] initEventLoops() {
//        EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];
            WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];
            for (int i = 0; i < workerEventLoops.length; i++) {
                workerEventLoops[i] = new WorkerEventLoop(i);
            }
            return workerEventLoops;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    boss.select();
                    Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isAcceptable()) {
                            ServerSocketChannel c = (ServerSocketChannel) key.channel();
                            SocketChannel sc = c.accept();
                            sc.configureBlocking(false);
                            log.debug("{} connected", sc.getRemoteAddress());
                            workers[index.getAndIncrement() % workers.length].register(sc);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Slf4j
    static class WorkerEventLoop implements Runnable {
        private Selector worker;
        private volatile boolean start = false;
        private int index;

        private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();

        public WorkerEventLoop(int index) {
            this.index = index;
        }

        public void register(SocketChannel sc) throws IOException {
            if (!start) {
                worker = Selector.open();
                new Thread(this, "worker-" + index).start();
                start = true;
            }
            tasks.add(() -> {
                try {
                    SelectionKey sckey = sc.register(worker, 0, null);
                    sckey.interestOps(SelectionKey.OP_READ);
                    worker.selectNow();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            worker.wakeup();
        }

        @Override
        public void run() {
            while (true) {
                try {
                    worker.select();
                    Runnable task = tasks.poll();
                    if (task != null) {
                        task.run();
                    }
                    Set<SelectionKey> keys = worker.selectedKeys();
                    Iterator<SelectionKey> iter = keys.iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        if (key.isReadable()) {
                            SocketChannel sc = (SocketChannel) key.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(128);
                            try {
                                int read = sc.read(buffer);
                                if (read == -1) {
                                    key.cancel();
                                    sc.close();
                                } else {
                                    buffer.flip();
                                    log.debug("{} message:", sc.getRemoteAddress());
                                    debugAll(buffer);
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                                key.cancel();
                                sc.close();
                            }
                        }
                        iter.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

6.2 如何拿到 cpu 個(gè)數(shù)

  • Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因?yàn)槿萜鞑皇俏锢砀綦x的,會(huì)拿到物理 cpu 個(gè)數(shù),而不是容器申請時(shí)的個(gè)數(shù)
  • 這個(gè)問題直到 jdk 10 才修復(fù),使用 jvm 參數(shù) UseContainerSupport 配置, 默認(rèn)開啟

7 UDP

  • UDP 是無連接的,client 發(fā)送數(shù)據(jù)不會(huì)管 server 是否開啟
  • server 這邊的 receive 方法會(huì)將接收到的數(shù)據(jù)存入 byte buffer,但如果數(shù)據(jù)報(bào)文超過 buffer 大小,多出來的數(shù)據(jù)會(huì)被默默拋棄

首先啟動(dòng)服務(wù)器端

public class UdpServer {
    public static void main(String[] args) {
        try (DatagramChannel channel = DatagramChannel.open()) {
            channel.socket().bind(new InetSocketAddress(9999));
            System.out.println("waiting...");
            ByteBuffer buffer = ByteBuffer.allocate(32);
            channel.receive(buffer);
            buffer.flip();
            debug(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

輸出

waiting...

運(yùn)行客戶端

public class UdpClient {
    public static void main(String[] args) {
        try (DatagramChannel channel = DatagramChannel.open()) {
            ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
            InetSocketAddress address = new InetSocketAddress("localhost", 9999);
            channel.send(buffer, address);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

接下來服務(wù)器端輸出文章來源地址http://www.zghlxwxcb.cn/news/detail-454832.html

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+

到了這里,關(guān)于由淺入深Netty基礎(chǔ)知識(shí)NIO網(wǎng)絡(luò)編程的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 由淺入深講MySQL數(shù)據(jù)庫之MySQL的基礎(chǔ)與介紹

    由淺入深講MySQL數(shù)據(jù)庫之MySQL的基礎(chǔ)與介紹

    從今天開始, 我們就帶各位小伙伴學(xué)習(xí)數(shù)據(jù)庫技術(shù)。數(shù)據(jù)庫技術(shù)是Java開發(fā)中必不可少的一部分知識(shí)內(nèi)容。也是非常重要的技術(shù)。本系列教程由淺入深, 全面講解數(shù)據(jù)庫體系。 非常適合零基礎(chǔ)的小伙伴來學(xué)習(xí)。 全文大約 【1900】字 ,不說廢話,只講可以讓你學(xué)到技術(shù)、明白原理

    2024年02月05日
    瀏覽(23)
  • Python+大數(shù)據(jù)-大學(xué)生精通Python從由淺入深(Python基礎(chǔ)篇)

    Python+大數(shù)據(jù)-大學(xué)生精通Python從由淺入深(Python基礎(chǔ)篇)

    看到這位 頭發(fā)濃密 大叔了嗎!1989年,為了打發(fā)圣誕節(jié)假期,龜叔(吉多·范·羅蘇姆)開始寫Python語言的編譯器 。 1991年 ,第一個(gè)Python編譯器誕生 Python這個(gè)名字,來自龜叔所摯愛的電視劇Monty Python’s Flying Circus (蒙蒂·蟒蛇的飛行馬戲團(tuán)) 我們?yōu)槭裁匆獙W(xué)習(xí)這一項(xiàng)語言,Pytho

    2024年02月13日
    瀏覽(26)
  • Docker由淺入深(一)

    容器化技術(shù)介紹 介紹容器化之前,我們得先知道,為什么會(huì)出現(xiàn)容器化,容器化之前都經(jīng)歷了什么 物理機(jī)時(shí)代 部署非常慢 成功很高 浪費(fèi)資源 難于擴(kuò)展與遷移 受制于硬件 虛擬化時(shí)代 在同一個(gè)物理機(jī)上安裝多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)安裝操作系統(tǒng)和應(yīng)用, 虛擬機(jī)之間物理資源

    2024年02月03日
    瀏覽(39)
  • 由淺入深了解HashMap源碼

    由淺入深了解HashMap源碼

    ? ? ? ?由經(jīng)典面試題引入,講解一下HashMap的底層數(shù)據(jù)結(jié)構(gòu)?這個(gè)面試題你當(dāng)然可以只答,HashMap底層的數(shù)據(jù)結(jié)構(gòu)是由(數(shù)組+鏈表+紅黑樹)實(shí)現(xiàn)的,但是顯然面試官不太滿意這個(gè)答案,畢竟這里有一個(gè)坑需要你去填,那就是在回答HashMap的底層數(shù)據(jù)結(jié)構(gòu)時(shí)需要考慮JDK的版本,因

    2023年04月13日
    瀏覽(28)
  • React - redux 使用(由淺入深)

    React - redux 使用(由淺入深)

    中文文檔: http://www.redux.org.cn/ 英文文檔: https://redux.js.org/ Github: https://github.com/reactjs/redux 可直接參照 目錄十 進(jìn)行使用 react-redux redux 是一個(gè)專門用于做狀態(tài)管理的JS庫(不是react插件庫)。 它可以用在 react, angular, vue 等項(xiàng)目中, 但基本與 react 配合使用。 作用: 集中式管理 re

    2024年02月07日
    瀏覽(24)
  • 【個(gè)人筆記】由淺入深分析 ClickHouse

    項(xiàng)目中不少地方使用到ClickHouse,就對它做了一個(gè)相對深入一點(diǎn)的了解和研究。并對各種知識(shí)點(diǎn)及整理過程中的一些理解心得進(jìn)行了匯總并分享出來,希望對其他同學(xué)能有幫助。 本文主要講解ClickHouse的特點(diǎn)、讀寫過程、存儲(chǔ)形式、索引、引擎、物化視圖等特性。 適合 入門和

    2024年01月20日
    瀏覽(29)
  • 由淺入深理解C#中的事件

    本文較長,給大家提供了目錄,可以直接看自己感興趣的部分。 前面介紹了C#中的委托,事件的很多部分都與委托類似。實(shí)際上,事件就像是專門用于某種特殊用途的簡單委托,事件包含了一個(gè)私有的委托,如下圖所示: 有關(guān)事件的私有委托需要了解的重要事項(xiàng)如下: 1、事

    2024年02月03日
    瀏覽(30)
  • Springboot3+EasyExcel由淺入深

    Springboot3+EasyExcel由淺入深

    環(huán)境介紹 技術(shù)棧 springboot3+easyexcel 軟件 版本 IDEA IntelliJ IDEA 2022.2.1 JDK 17 Spring Boot 3 EasyExcel是一個(gè)基于Java的、快速、簡潔、解決大文件內(nèi)存溢出的Excel處理工具。 他能讓你在不用考慮性能、內(nèi)存的等因素的情況下,快速完成Excel的讀、寫等功能。 官網(wǎng)https://easyexcel.opensource.ali

    2024年01月16日
    瀏覽(28)
  • 【由淺入深學(xué)習(xí)MySQL】之索引進(jìn)階

    【由淺入深學(xué)習(xí)MySQL】之索引進(jìn)階

    本系列為:MySQL數(shù)據(jù)庫詳解,為千鋒資深教學(xué)老師獨(dú)家創(chuàng)作 致力于為大家講解清晰MySQL數(shù)據(jù)庫相關(guān)知識(shí)點(diǎn),含有豐富的代碼案例及講解。如果感覺對大家有幫助的話,可以【關(guān)注】持續(xù)追更~ 文末有本文重點(diǎn)總結(jié),技術(shù)類問題,也歡迎大家和我們溝通交流! 從今天開始本系列

    2024年02月05日
    瀏覽(22)
  • 手拉手Vue組件由淺入深

    手拉手Vue組件由淺入深

    組件 (Component) 是 Vue.js 最強(qiáng)大的功能之一,它是html、css、js等的一個(gè)聚合體,封裝性和隔離性非常強(qiáng)。 組件化開發(fā): ??? 1、將一個(gè)具備完整功能的項(xiàng)目的一部分分割多處使用 ??? 2、加快項(xiàng)目的進(jìn)度 ??? 3、可以進(jìn)行項(xiàng)目的復(fù)用 組件注冊分為:全局注冊和局部注冊 目錄

    2024年01月18日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包