1 非阻塞 vs 阻塞
1.1 阻塞
- 阻塞模式下,相關(guān)方法都會(huì)導(dǎo)致線程暫停
- ServerSocketChannel.accept 會(huì)在沒有連接建立時(shí)讓線程暫停
- SocketChannel.read 會(huì)在沒有數(shù)據(jù)可讀時(shí)讓線程暫停
- 阻塞的表現(xiàn)其實(shí)就是線程暫停了,暫停期間不會(huì)占用 cpu,但線程相當(dāng)于閑置
- 單線程下,阻塞方法之間相互影響,幾乎不能正常工作,需要多線程支持
- 但多線程下,有新的問題,體現(xiàn)在以下方面
- 32 位 jvm 一個(gè)線程 320k,64 位 jvm 一個(gè)線程 1024k,如果連接數(shù)過多,必然導(dǎo)致 OOM,并且線程太多,反而會(huì)因?yàn)轭l繁上下文切換導(dǎo)致性能降低
- 可以采用線程池技術(shù)來減少線程數(shù)和線程上下文切換,但治標(biāo)不治本,如果有很多連接建立,但長時(shí)間 inactive,會(huì)阻塞線程池中所有線程,因此不適合長連接,只適合短連接
服務(wù)器端
// 使用 nio 來理解阻塞模式, 單線程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 創(chuàng)建了服務(wù)器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 2. 綁定監(jiān)聽端口
ssc.bind(new InetSocketAddress(8080));
// 3. 連接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信
log.debug("connecting...");
SocketChannel sc = ssc.accept(); // 阻塞方法,線程停止運(yùn)行
log.debug("connected... {}", sc);
channels.add(sc);
for (SocketChannel channel : channels) {
// 5. 接收客戶端發(fā)送的數(shù)據(jù)
log.debug("before read... {}", channel);
channel.read(buffer); // 阻塞方法,線程停止運(yùn)行
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read...{}", channel);
}
}
客戶端
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting...");
1.2 非阻塞
- 非阻塞模式下,相關(guān)方法都會(huì)不會(huì)讓線程暫停
- 在 ServerSocketChannel.accept 在沒有連接建立時(shí),會(huì)返回 null,繼續(xù)運(yùn)行
- SocketChannel.read 在沒有數(shù)據(jù)可讀時(shí),會(huì)返回 0,但線程不必阻塞,可以去執(zhí)行其它 SocketChannel 的 read 或是去執(zhí)行 ServerSocketChannel.accept
- 寫數(shù)據(jù)時(shí),線程只是等待數(shù)據(jù)寫入 Channel 即可,無需等 Channel 通過網(wǎng)絡(luò)把數(shù)據(jù)發(fā)送出去
- 但非阻塞模式下,即使沒有連接建立,和可讀數(shù)據(jù),線程仍然在不斷運(yùn)行,白白浪費(fèi)了 cpu
- 數(shù)據(jù)復(fù)制過程中,線程實(shí)際還是阻塞的(AIO 改進(jìn)的地方)
服務(wù)器端,客戶端代碼不變
// 使用 nio 來理解非阻塞模式, 單線程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 創(chuàng)建了服務(wù)器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 綁定監(jiān)聽端口
ssc.bind(new InetSocketAddress(8080));
// 3. 連接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信
SocketChannel sc = ssc.accept(); // 非阻塞,線程還會(huì)繼續(xù)運(yùn)行,如果沒有連接建立,但sc是null
if (sc != null) {
log.debug("connected... {}", sc);
sc.configureBlocking(false); // 非阻塞模式
channels.add(sc);
}
for (SocketChannel channel : channels) {
// 5. 接收客戶端發(fā)送的數(shù)據(jù)
int read = channel.read(buffer);// 非阻塞,線程仍然會(huì)繼續(xù)運(yùn)行,如果沒有讀到數(shù)據(jù),read 返回 0
if (read > 0) {
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read...{}", channel);
}
}
}
1.3 多路復(fù)用
單線程可以配合 Selector 完成對多個(gè) Channel 可讀寫事件的監(jiān)控,這稱之為多路復(fù)用
- 多路復(fù)用僅針對網(wǎng)絡(luò) IO、普通文件 IO 沒法利用多路復(fù)用
- 如果不用 Selector 的非阻塞模式,線程大部分時(shí)間都在做無用功,而 Selector 能夠保證
- 有可連接事件時(shí)才去連接
- 有可讀事件才去讀取
- 有可寫事件才去寫入
- 限于網(wǎng)絡(luò)傳輸能力,Channel 未必時(shí)時(shí)可寫,一旦 Channel 可寫,會(huì)觸發(fā) Selector 的可寫事件
2 Selector
好處
- 一個(gè)線程配合 selector 就可以監(jiān)控多個(gè) channel 的事件,事件發(fā)生線程才去處理。避免非阻塞模式下所做無用功
- 讓這個(gè)線程能夠被充分利用
- 節(jié)約了線程的數(shù)量
- 減少了線程上下文切換
2.1 創(chuàng)建
Selector selector = Selector.open();
2.2 綁定 Channel 事件
也稱之為注冊事件,綁定的事件 selector 才會(huì)關(guān)心
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 綁定事件);
- channel 必須工作在非阻塞模式
- FileChannel 沒有非阻塞模式,因此不能配合 selector 一起使用
- 綁定的事件類型可以有
- connect - 客戶端連接成功時(shí)觸發(fā)
- accept - 服務(wù)器端成功接受連接時(shí)觸發(fā)
- read - 數(shù)據(jù)可讀入時(shí)觸發(fā),有因?yàn)榻邮漳芰θ酰瑪?shù)據(jù)暫不能讀入的情況
- write - 數(shù)據(jù)可寫出時(shí)觸發(fā),有因?yàn)榘l(fā)送能力弱,數(shù)據(jù)暫不能寫出的情況
2.3 監(jiān)聽 Channel 事件
可以通過下面三種方法來監(jiān)聽是否有事件發(fā)生,方法的返回值代表有多少 channel 發(fā)生了事件
方法1,阻塞直到綁定事件發(fā)生
int count = selector.select();
方法2,阻塞直到綁定事件發(fā)生,或是超時(shí)(時(shí)間單位為 ms)
int count = selector.select(long timeout);
方法3,不會(huì)阻塞,也就是不管有沒有事件,立刻返回,自己根據(jù)返回值檢查是否有事件
int count = selector.selectNow();
2.4 select 何時(shí)不阻塞
- 事件發(fā)生時(shí)
- 客戶端發(fā)起連接請求,會(huì)觸發(fā) accept 事件
- 客戶端發(fā)送數(shù)據(jù)過來,客戶端正常、異常關(guān)閉時(shí),都會(huì)觸發(fā) read 事件,另外如果發(fā)送的數(shù)據(jù)大于 buffer 緩沖區(qū),會(huì)觸發(fā)多次讀取事件
- channel 可寫,會(huì)觸發(fā) write 事件
- 在 linux 下 nio bug 發(fā)生時(shí)
- 調(diào)用 selector.wakeup()
- 調(diào)用 selector.close()
- selector 所在線程 interrupt
3 處理 accept 事件
客戶端代碼為
public class Client {
public static void main(String[] args) {
try (Socket socket = new Socket("localhost", 8080)) {
System.out.println(socket);
socket.getOutputStream().write("world".getBytes());
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
服務(wù)器端代碼為
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
// int count = selector.selectNow();
log.debug("select count: {}", count);
// if(count <= 0) {
// continue;
// }
// 獲取所有事件
Set<SelectionKey> keys = selector.selectedKeys();
// 遍歷所有事件,逐一處理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判斷事件類型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必須處理
SocketChannel sc = c.accept();
log.debug("{}", sc);
}
// 處理完畢,必須將事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.1 事件發(fā)生后能否不處理
事件發(fā)生后,要么處理,要么取消(cancel),不能什么都不做,否則下次該事件仍會(huì)觸發(fā),這是因?yàn)?nio 底層使用的是水平觸發(fā)
4 處理 read 事件
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
// int count = selector.selectNow();
log.debug("select count: {}", count);
// if(count <= 0) {
// continue;
// }
// 獲取所有事件
Set<SelectionKey> keys = selector.selectedKeys();
// 遍歷所有事件,逐一處理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判斷事件類型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必須處理
SocketChannel sc = c.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
log.debug("連接已建立: {}", sc);
} else if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
if(read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
debug(buffer);
}
}
// 處理完畢,必須將事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
開啟兩個(gè)客戶端,修改一下發(fā)送文字,輸出
sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - 連接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60367]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - 連接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60378]
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 6f 72 6c 64 |world |
+--------+-------------------------------------------------+----------------+
4.1 為何要 iter.remove()
因?yàn)?select 在事件發(fā)生后,就會(huì)將相關(guān)的 key 放入 selectedKeys 集合,但不會(huì)在處理完后從 selectedKeys 集合中移除,需要我們自己編碼刪除。例如
- 第一次觸發(fā)了 ssckey 上的 accept 事件,沒有移除 ssckey
- 第二次觸發(fā)了 sckey 上的 read 事件,但這時(shí) selectedKeys 中還有上次的 ssckey ,在處理時(shí)因?yàn)闆]有真正的 serverSocket 連上了,就會(huì)導(dǎo)致空指針異常
4.2 cancel 的作用
cancel 會(huì)取消注冊在 selector 上的 channel,并從 keys 集合中刪除 key 后續(xù)不會(huì)再監(jiān)聽事件
4.3 不處理邊界的問題
以前有同學(xué)寫過這樣的代碼,思考注釋中兩個(gè)問題,以 bio 為例,其實(shí) nio 道理是一樣的
public class Server {
public static void main(String[] args) throws IOException {
ServerSocket ss=new ServerSocket(9000);
while (true) {
Socket s = ss.accept();
InputStream in = s.getInputStream();
// 這里這么寫,有沒有問題
byte[] arr = new byte[4];
while(true) {
int read = in.read(arr);
// 這里這么寫,有沒有問題
if(read == -1) {
break;
}
System.out.println(new String(arr, 0, read));
}
}
}
}
客戶端
public class Client {
public static void main(String[] args) throws IOException {
Socket max = new Socket("localhost", 9000);
OutputStream out = max.getOutputStream();
out.write("hello".getBytes());
out.write("world".getBytes());
out.write("你好".getBytes());
max.close();
}
}
輸出
hell
owor
ld?
?好
為什么?
4.4 處理消息的邊界
- 一種思路是固定消息長度,數(shù)據(jù)包大小一樣,服務(wù)器按預(yù)定長度讀取,缺點(diǎn)是浪費(fèi)帶寬
- 另一種思路是按分隔符拆分,缺點(diǎn)是效率低
- TLV 格式,即 Type 類型、Length 長度、Value 數(shù)據(jù),類型和長度已知的情況下,就可以方便獲取消息大小,分配合適的 buffer,缺點(diǎn)是 buffer 需要提前分配,如果內(nèi)容過大,則影響 server 吞吐量
- Http 1.1 是 TLV 格式
- Http 2.0 是 LTV 格式
服務(wù)器端
private static void split(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
// 找到一條完整消息
if (source.get(i) == '\n') {
int length = i + 1 - source.position();
// 把這條完整消息存入新的 ByteBuffer
ByteBuffer target = ByteBuffer.allocate(length);
// 從 source 讀,向 target 寫
for (int j = 0; j < length; j++) {
target.put(source.get());
}
debugAll(target);
}
}
source.compact(); // 0123456789abcdef position 16 limit 16
}
public static void main(String[] args) throws IOException {
// 1. 創(chuàng)建 selector, 管理多個(gè) channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 2. 建立 selector 和 channel 的聯(lián)系(注冊)
// SelectionKey 就是將來事件發(fā)生后,通過它可以知道事件和哪個(gè)channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只關(guān)注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("sscKey:{}", sscKey);
ssc.bind(new InetSocketAddress(8080));
while (true) {
// 3. select 方法, 沒有事件發(fā)生,線程阻塞,有事件,線程才會(huì)恢復(fù)運(yùn)行
// select 在事件未處理時(shí),它不會(huì)阻塞, 事件發(fā)生后要么處理,要么取消,不能置之不理
selector.select();
// 4. 處理事件, selectedKeys 內(nèi)部包含了所有發(fā)生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 處理key 時(shí),要從 selectedKeys 集合中刪除,否則下次處理就會(huì)有問題
iter.remove();
log.debug("key: {}", key);
// 5. 區(qū)分事件類型
if (key.isAcceptable()) { // 如果是 accept
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
// 將一個(gè) byteBuffer 作為附件關(guān)聯(lián)到 selectionKey 上
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
log.debug("scKey:{}", scKey);
} else if (key.isReadable()) { // 如果是 read
try {
SocketChannel channel = (SocketChannel) key.channel(); // 拿到觸發(fā)事件的channel
// 獲取 selectionKey 上關(guān)聯(lián)的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = channel.read(buffer); // 如果是正常斷開,read 的方法的返回值是 -1
if(read == -1) {
key.cancel();
} else {
split(buffer);
// 需要擴(kuò)容
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer); // 0123456789abcdef3333\n
key.attach(newBuffer);
}
}
} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因?yàn)榭蛻舳藬嚅_了,因此需要將 key 取消(從 selector 的 keys 集合中真正刪除 key)
}
}
}
}
}
客戶端
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress address = sc.getLocalAddress();
// sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
System.in.read();
4.5 ByteBuffer 大小分配
- 每個(gè) channel 都需要記錄可能被切分的消息,因?yàn)?ByteBuffer 不能被多個(gè) channel 共同使用,因此需要為每個(gè) channel 維護(hù)一個(gè)獨(dú)立的 ByteBuffer
- ByteBuffer 不能太大,比如一個(gè) ByteBuffer 1Mb 的話,要支持百萬連接就要 1Tb 內(nèi)存,因此需要設(shè)計(jì)大小可變的 ByteBuffer
- 一種思路是首先分配一個(gè)較小的 buffer,例如 4k,如果發(fā)現(xiàn)數(shù)據(jù)不夠,再分配 8k 的 buffer,將 4k buffer 內(nèi)容拷貝至 8k buffer,優(yōu)點(diǎn)是消息連續(xù)容易處理,缺點(diǎn)是數(shù)據(jù)拷貝耗費(fèi)性能,參考實(shí)現(xiàn) http://tutorials.jenkov.com/java-performance/resizable-array.html
- 另一種思路是用多個(gè)數(shù)組組成 buffer,一個(gè)數(shù)組不夠,把多出來的內(nèi)容寫入新的數(shù)組,與前面的區(qū)別是消息存儲(chǔ)不連續(xù)解析復(fù)雜,優(yōu)點(diǎn)是避免了拷貝引起的性能損耗
5 處理 write 事件
5.1 一次無法寫完例子
- 非阻塞模式下,無法保證把 buffer 中所有數(shù)據(jù)都寫入 channel,因此需要追蹤 write 方法的返回值(代表實(shí)際寫入字節(jié)數(shù))
- 用 selector 監(jiān)聽所有 channel 的可寫事件,每個(gè) channel 都需要一個(gè) key 來跟蹤 buffer,但這樣又會(huì)導(dǎo)致占用內(nèi)存過多,就有兩階段策略
- 當(dāng)消息處理器第一次寫入消息時(shí),才將 channel 注冊到 selector 上
- selector 檢查 channel 上的可寫事件,如果所有的數(shù)據(jù)寫完了,就取消 channel 的注冊
- 如果不取消,會(huì)每次可寫均會(huì)觸發(fā) write 事件
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
// 1. 向客戶端發(fā)送內(nèi)容
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
int write = sc.write(buffer);
// 3. write 表示實(shí)際寫了多少字節(jié)
System.out.println("實(shí)際寫入字節(jié):" + write);
// 4. 如果有剩余未讀字節(jié),才需要關(guān)注寫事件
if (buffer.hasRemaining()) {
// read 1 write 4
// 在原有關(guān)注事件的基礎(chǔ)上,多關(guān)注 寫事件
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
// 把 buffer 作為附件加入 sckey
sckey.attach(buffer);
}
} else if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println("實(shí)際寫入字節(jié):" + write);
if (!buffer.hasRemaining()) { // 寫完了
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
}
}
}
}
客戶端
public class WriteClient {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
sc.connect(new InetSocketAddress("localhost", 8080));
int count = 0;
while (true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isConnectable()) {
System.out.println(sc.finishConnect());
} else if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
buffer.clear();
System.out.println(count);
}
}
}
}
}
5.2 write 為何要取消
只要向 channel 發(fā)送數(shù)據(jù)時(shí),socket 緩沖可寫,這個(gè)事件會(huì)頻繁觸發(fā),因此應(yīng)當(dāng)只在 socket 緩沖區(qū)寫不下時(shí)再關(guān)注可寫事件,數(shù)據(jù)寫完之后再取消關(guān)注
6 更進(jìn)一步
6.1 利用多線程優(yōu)化
現(xiàn)在都是多核 cpu,設(shè)計(jì)時(shí)要充分考慮別讓 cpu 的力量被白白浪費(fèi)
前面的代碼只有一個(gè)選擇器,沒有充分利用多核 cpu,如何改進(jìn)呢?
分兩組選擇器
- 單線程配一個(gè)選擇器,專門處理 accept 事件
- 創(chuàng)建 cpu 核心數(shù)的線程,每個(gè)線程配一個(gè)選擇器,輪流處理 read 事件
public class ChannelDemo7 {
public static void main(String[] args) throws IOException {
new BossEventLoop().register();
}
@Slf4j
static class BossEventLoop implements Runnable {
private Selector boss;
private WorkerEventLoop[] workers;
private volatile boolean start = false;
AtomicInteger index = new AtomicInteger();
public void register() throws IOException {
if (!start) {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
boss = Selector.open();
SelectionKey ssckey = ssc.register(boss, 0, null);
ssckey.interestOps(SelectionKey.OP_ACCEPT);
workers = initEventLoops();
new Thread(this, "boss").start();
log.debug("boss start...");
start = true;
}
}
public WorkerEventLoop[] initEventLoops() {
// EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];
WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];
for (int i = 0; i < workerEventLoops.length; i++) {
workerEventLoops[i] = new WorkerEventLoop(i);
}
return workerEventLoops;
}
@Override
public void run() {
while (true) {
try {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
SocketChannel sc = c.accept();
sc.configureBlocking(false);
log.debug("{} connected", sc.getRemoteAddress());
workers[index.getAndIncrement() % workers.length].register(sc);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@Slf4j
static class WorkerEventLoop implements Runnable {
private Selector worker;
private volatile boolean start = false;
private int index;
private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();
public WorkerEventLoop(int index) {
this.index = index;
}
public void register(SocketChannel sc) throws IOException {
if (!start) {
worker = Selector.open();
new Thread(this, "worker-" + index).start();
start = true;
}
tasks.add(() -> {
try {
SelectionKey sckey = sc.register(worker, 0, null);
sckey.interestOps(SelectionKey.OP_READ);
worker.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
});
worker.wakeup();
}
@Override
public void run() {
while (true) {
try {
worker.select();
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
Set<SelectionKey> keys = worker.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
try {
int read = sc.read(buffer);
if (read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
log.debug("{} message:", sc.getRemoteAddress());
debugAll(buffer);
}
} catch (IOException e) {
e.printStackTrace();
key.cancel();
sc.close();
}
}
iter.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
6.2 如何拿到 cpu 個(gè)數(shù)
- Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因?yàn)槿萜鞑皇俏锢砀綦x的,會(huì)拿到物理 cpu 個(gè)數(shù),而不是容器申請時(shí)的個(gè)數(shù)
- 這個(gè)問題直到 jdk 10 才修復(fù),使用 jvm 參數(shù) UseContainerSupport 配置, 默認(rèn)開啟
7 UDP
- UDP 是無連接的,client 發(fā)送數(shù)據(jù)不會(huì)管 server 是否開啟
- server 這邊的 receive 方法會(huì)將接收到的數(shù)據(jù)存入 byte buffer,但如果數(shù)據(jù)報(bào)文超過 buffer 大小,多出來的數(shù)據(jù)會(huì)被默默拋棄
首先啟動(dòng)服務(wù)器端
public class UdpServer {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
channel.socket().bind(new InetSocketAddress(9999));
System.out.println("waiting...");
ByteBuffer buffer = ByteBuffer.allocate(32);
channel.receive(buffer);
buffer.flip();
debug(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
輸出
waiting...
運(yùn)行客戶端文章來源:http://www.zghlxwxcb.cn/news/detail-454832.html
public class UdpClient {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
InetSocketAddress address = new InetSocketAddress("localhost", 9999);
channel.send(buffer, address);
} catch (Exception e) {
e.printStackTrace();
}
}
}
接下來服務(wù)器端輸出文章來源地址http://www.zghlxwxcb.cn/news/detail-454832.html
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
到了這里,關(guān)于由淺入深Netty基礎(chǔ)知識(shí)NIO網(wǎng)絡(luò)編程的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!