non-blocking io 非阻塞 IO
1. 三大組件
1.1 Channel & Buffer
channel 有一點類似于 stream,它就是讀寫數(shù)據(jù)的雙向通道,可以從 channel 將數(shù)據(jù)讀入 buffer,也可以將 buffer 的數(shù)據(jù)寫入 channel,而之前的 stream 要么是輸入,要么是輸出,channel 比 stream 更為底層
常見的 Channel 有
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
buffer 則用來緩沖讀寫數(shù)據(jù),常見的 buffer 有
- ByteBuffer
- MappedByteBuffer
- DirectByteBuffer
- HeapByteBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
- CharBuffer
1.2 Selector
selector 單從字面意思不好理解,需要結(jié)合服務(wù)器的設(shè)計演化來理解它的用途
多線程版設(shè)計
?? 多線程版缺點
- 內(nèi)存占用高
- 線程上下文切換成本高
- 只適合連接數(shù)少的場景
線程池版設(shè)計
?? 線程池版缺點
- 阻塞模式下,線程僅能處理一個 socket 連接
- 僅適合短連接場景
selector 版設(shè)計
selector 的作用就是配合一個線程來管理多個 channel,獲取這些 channel 上發(fā)生的事件,這些 channel 工作在非阻塞模式下,不會讓線程吊死在一個 channel 上。適合連接數(shù)特別多,但流量低的場景(low traffic)
調(diào)用 selector 的 select() 會阻塞直到 channel 發(fā)生了讀寫就緒事件,這些事件發(fā)生,select 方法就會返回這些事件交給 thread 來處理
2. ByteBuffer
有一普通文本文件 data.txt,內(nèi)容為
1234567890abcd
使用 FileChannel 來讀取文件內(nèi)容
@Slf4j
public class ChannelDemo1 {
public static void main(String[] args) {
try (RandomAccessFile file = new RandomAccessFile("helloword/data.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(10);
do {
// 向 buffer 寫入
int len = channel.read(buffer);
log.debug("讀到字節(jié)數(shù):{}", len);
if (len == -1) {
break;
}
// 切換 buffer 讀模式
buffer.flip();
while(buffer.hasRemaining()) {
log.debug("{}", (char)buffer.get());
}
// 切換 buffer 寫模式
buffer.clear();
} while (true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
輸出
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節(jié)數(shù):10
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 1
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 2
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 3
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 4
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 5
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 6
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 7
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 8
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 9
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 0
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節(jié)數(shù):4
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - a
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - b
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - c
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - d
10:39:03 [DEBUG] [main] c.i.n.ChannelDemo1 - 讀到字節(jié)數(shù):-1
2.1 ByteBuffer 正確使用姿勢
- 向 buffer 寫入數(shù)據(jù),例如調(diào)用 channel.read(buffer)
- 調(diào)用 flip() 切換至讀模式
- 從 buffer 讀取數(shù)據(jù),例如調(diào)用 buffer.get()
- 調(diào)用 clear() 或 compact() 切換至寫模式
- 重復(fù) 1~4 步驟
2.2 ByteBuffer 結(jié)構(gòu)
ByteBuffer 有以下重要屬性
- capacity
- position
- limit
一開始
寫模式下,position 是寫入位置,limit 等于容量,下圖表示寫入了 4 個字節(jié)后的狀態(tài)
flip 動作發(fā)生后,position 切換為讀取位置,limit 切換為讀取限制
讀取 4 個字節(jié)后,狀態(tài)
clear 動作發(fā)生后,狀態(tài)
compact 方法,是把未讀完的部分向前壓縮,然后切換至寫模式
?? 調(diào)試工具類
public class ByteBufferUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];
static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}
int i;
// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}
// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}
// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}
// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}
// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}
/**
* 打印所有內(nèi)容
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
System.out.println(origin);
buffer.limit(oldlimit);
}
/**
* 打印可讀取內(nèi)容
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
System.out.println(builder);
}
private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsException(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
NEWLINE + "+--------+-------------------------------------------------+----------------+");
final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;
// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;
// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");
// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}
// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");
// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}
dump.append(NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}
private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}
public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
}
2.3 ByteBuffer 常見方法
分配空間
可以使用 allocate 方法為 ByteBuffer 分配空間,其它 buffer 類也有該方法
Bytebuffer buf = ByteBuffer.allocate(16);
向 buffer 寫入數(shù)據(jù)
有兩種辦法
- 調(diào)用 channel 的 read 方法
- 調(diào)用 buffer 自己的 put 方法
int readBytes = channel.read(buf);
和
buf.put((byte)127);
從 buffer 讀取數(shù)據(jù)
同樣有兩種辦法
- 調(diào)用 channel 的 write 方法
- 調(diào)用 buffer 自己的 get 方法
int writeBytes = channel.write(buf);
和
byte b = buf.get();
get 方法會讓 position 讀指針向后走,如果想重復(fù)讀取數(shù)據(jù)
- 可以調(diào)用 rewind 方法將 position 重新置為 0
- 或者調(diào)用 get(int i) 方法獲取索引 i 的內(nèi)容,它不會移動讀指針
mark 和 reset
mark 是在讀取時,做一個標(biāo)記,即使 position 改變,只要調(diào)用 reset 就能回到 mark 的位置
注意
rewind 和 flip 都會清除 mark 位置
字符串與 ByteBuffer 互轉(zhuǎn)
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好");
ByteBuffer buffer2 = Charset.forName("utf-8").encode("你好");
debug(buffer1);
debug(buffer2);
CharBuffer buffer3 = StandardCharsets.UTF_8.decode(buffer1);
System.out.println(buffer3.getClass());
System.out.println(buffer3.toString());
輸出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| e4 bd a0 e5 a5 bd |...... |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| e4 bd a0 e5 a5 bd |...... |
+--------+-------------------------------------------------+----------------+
class java.nio.HeapCharBuffer
你好
?? Buffer 的線程安全
Buffer 是非線程安全的
2.4 Scattering Reads
分散讀取,有一個文本文件 3parts.txt
onetwothree
使用如下方式讀取,可以將數(shù)據(jù)填充至多個 buffer
try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer a = ByteBuffer.allocate(3);
ByteBuffer b = ByteBuffer.allocate(3);
ByteBuffer c = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{a, b, c});
a.flip();
b.flip();
c.flip();
debug(a);
debug(b);
debug(c);
} catch (IOException e) {
e.printStackTrace();
}
結(jié)果
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 6f 6e 65 |one |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 77 6f |two |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 68 72 65 65 |three |
+--------+-------------------------------------------------+----------------+
2.5 Gathering Writes
使用如下方式寫入,可以將多個 buffer 的數(shù)據(jù)填充至 channel
try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer d = ByteBuffer.allocate(4);
ByteBuffer e = ByteBuffer.allocate(4);
channel.position(11);
d.put(new byte[]{'f', 'o', 'u', 'r'});
e.put(new byte[]{'f', 'i', 'v', 'e'});
d.flip();
e.flip();
debug(d);
debug(e);
channel.write(new ByteBuffer[]{d, e});
} catch (IOException e) {
e.printStackTrace();
}
輸出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 6f 75 72 |four |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 69 76 65 |five |
+--------+-------------------------------------------------+----------------+
文件內(nèi)容
onetwothreefourfive
2.6 練習(xí)
網(wǎng)絡(luò)上有多條數(shù)據(jù)發(fā)送給服務(wù)端,數(shù)據(jù)之間使用 \n 進行分隔
但由于某種原因這些數(shù)據(jù)在接收時,被進行了重新組合,例如原始數(shù)據(jù)有3條為
- Hello,world\n
- I’m zhangsan\n
- How are you?\n
變成了下面的兩個 byteBuffer (黏包,半包)
- Hello,world\nI’m zhangsan\nHo
- w are you?\n
現(xiàn)在要求你編寫程序,將錯亂的數(shù)據(jù)恢復(fù)成原始的按 \n 分隔的數(shù)據(jù)
public static void main(String[] args) {
ByteBuffer source = ByteBuffer.allocate(32);
// 11 24
source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
split(source);
source.put("w are you?\nhaha!\n".getBytes());
split(source);
}
private static void split(ByteBuffer source) {
source.flip();
int oldLimit = source.limit();
for (int i = 0; i < oldLimit; i++) {
if (source.get(i) == '\n') {
System.out.println(i);
ByteBuffer target = ByteBuffer.allocate(i + 1 - source.position());
// 0 ~ limit
source.limit(i + 1);
target.put(source); // 從source 讀,向 target 寫
debugAll(target);
source.limit(oldLimit);
}
}
source.compact();
}
3. 文件編程
3.1 FileChannel
?? FileChannel 工作模式
FileChannel 只能工作在阻塞模式下
獲取
不能直接打開 FileChannel,必須通過 FileInputStream、FileOutputStream 或者 RandomAccessFile 來獲取 FileChannel,它們都有 getChannel 方法
- 通過 FileInputStream 獲取的 channel 只能讀
- 通過 FileOutputStream 獲取的 channel 只能寫
- 通過 RandomAccessFile 是否能讀寫根據(jù)構(gòu)造 RandomAccessFile 時的讀寫模式?jīng)Q定
讀取
會從 channel 讀取數(shù)據(jù)填充 ByteBuffer,返回值表示讀到了多少字節(jié),-1 表示到達(dá)了文件的末尾
int readBytes = channel.read(buffer);
寫入
寫入的正確姿勢如下, SocketChannel
ByteBuffer buffer = ...;
buffer.put(...); // 存入數(shù)據(jù)
buffer.flip(); // 切換讀模式
while(buffer.hasRemaining()) {
channel.write(buffer);
}
在 while 中調(diào)用 channel.write 是因為 write 方法并不能保證一次將 buffer 中的內(nèi)容全部寫入 channel
關(guān)閉
channel 必須關(guān)閉,不過調(diào)用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法會間接地調(diào)用 channel 的 close 方法
位置
獲取當(dāng)前位置
long pos = channel.position();
設(shè)置當(dāng)前位置
long newPos = ...;
channel.position(newPos);
設(shè)置當(dāng)前位置時,如果設(shè)置為文件的末尾
- 這時讀取會返回 -1
- 這時寫入,會追加內(nèi)容,但要注意如果 position 超過了文件末尾,再寫入時在新內(nèi)容和原末尾之間會有空洞(00)
大小
使用 size 方法獲取文件的大小
強制寫入
操作系統(tǒng)出于性能的考慮,會將數(shù)據(jù)緩存,不是立刻寫入磁盤??梢哉{(diào)用 force(true) 方法將文件內(nèi)容和元數(shù)據(jù)(文件的權(quán)限等信息)立刻寫入磁盤
3.2 兩個 Channel 傳輸數(shù)據(jù)
String FROM = "helloword/data.txt";
String TO = "helloword/to.txt";
long start = System.nanoTime();
try (FileChannel from = new FileInputStream(FROM).getChannel();
FileChannel to = new FileOutputStream(TO).getChannel();
) {
from.transferTo(0, from.size(), to);
} catch (IOException e) {
e.printStackTrace();
}
long end = System.nanoTime();
System.out.println("transferTo 用時:" + (end - start) / 1000_000.0);
輸出
transferTo 用時:8.2011
超過 2g 大小的文件傳輸
public class TestFileChannelTransferTo {
public static void main(String[] args) {
try (
FileChannel from = new FileInputStream("data.txt").getChannel();
FileChannel to = new FileOutputStream("to.txt").getChannel();
) {
// 效率高,底層會利用操作系統(tǒng)的零拷貝進行優(yōu)化
long size = from.size();
// left 變量代表還剩余多少字節(jié)
for (long left = size; left > 0; ) {
System.out.println("position:" + (size - left) + " left:" + left);
left -= from.transferTo((size - left), left, to);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
實際傳輸一個超大文件
position:0 left:7769948160
position:2147483647 left:5622464513
position:4294967294 left:3474980866
position:6442450941 left:1327497219
3.3 Path
jdk7 引入了 Path 和 Paths 類
- Path 用來表示文件路徑
- Paths 是工具類,用來獲取 Path 實例
Path source = Paths.get("1.txt"); // 相對路徑 使用 user.dir 環(huán)境變量來定位 1.txt
Path source = Paths.get("d:\\1.txt"); // 絕對路徑 代表了 d:\1.txt
Path source = Paths.get("d:/1.txt"); // 絕對路徑 同樣代表了 d:\1.txt
Path projects = Paths.get("d:\\data", "projects"); // 代表了 d:\data\projects
-
.
代表了當(dāng)前路徑 -
..
代表了上一級路徑
例如目錄結(jié)構(gòu)如下
d:
|- data
|- projects
|- a
|- b
代碼
Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
System.out.println(path);
System.out.println(path.normalize()); // 正常化路徑
會輸出
d:\data\projects\a\..\b
d:\data\projects\b
3.4 Files
檢查文件是否存在
Path path = Paths.get("helloword/data.txt");
System.out.println(Files.exists(path));
創(chuàng)建一級目錄
Path path = Paths.get("helloword/d1");
Files.createDirectory(path);
- 如果目錄已存在,會拋異常 FileAlreadyExistsException
- 不能一次創(chuàng)建多級目錄,否則會拋異常 NoSuchFileException
創(chuàng)建多級目錄用
Path path = Paths.get("helloword/d1/d2");
Files.createDirectories(path);
拷貝文件
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/target.txt");
Files.copy(source, target);
- 如果文件已存在,會拋異常 FileAlreadyExistsException
如果希望用 source 覆蓋掉 target,需要用 StandardCopyOption 來控制
Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);
移動文件
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/data.txt");
Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
- StandardCopyOption.ATOMIC_MOVE 保證文件移動的原子性
刪除文件
Path target = Paths.get("helloword/target.txt");
Files.delete(target);
- 如果文件不存在,會拋異常 NoSuchFileException
刪除目錄
Path target = Paths.get("helloword/d1");
Files.delete(target);
- 如果目錄還有內(nèi)容,會拋異常 DirectoryNotEmptyException
遍歷目錄文件
public static void main(String[] args) throws IOException {
Path path = Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91");
AtomicInteger dirCount = new AtomicInteger();
AtomicInteger fileCount = new AtomicInteger();
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {
System.out.println(dir);
dirCount.incrementAndGet();
return super.preVisitDirectory(dir, attrs);
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
System.out.println(file);
fileCount.incrementAndGet();
return super.visitFile(file, attrs);
}
});
System.out.println(dirCount); // 133
System.out.println(fileCount); // 1479
}
統(tǒng)計 jar 的數(shù)目
Path path = Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91");
AtomicInteger fileCount = new AtomicInteger();
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
if (file.toFile().getName().endsWith(".jar")) {
fileCount.incrementAndGet();
}
return super.visitFile(file, attrs);
}
});
System.out.println(fileCount); // 724
刪除多級目錄
Path path = Paths.get("d:\\a");
Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
Files.delete(file);
return super.visitFile(file, attrs);
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc)
throws IOException {
Files.delete(dir);
return super.postVisitDirectory(dir, exc);
}
});
?? 刪除很危險
刪除是危險操作,確保要遞歸刪除的文件夾沒有重要內(nèi)容
拷貝多級目錄
long start = System.currentTimeMillis();
String source = "D:\\Snipaste-1.16.2-x64";
String target = "D:\\Snipaste-1.16.2-x64aaa";
Files.walk(Paths.get(source)).forEach(path -> {
try {
String targetName = path.toString().replace(source, target);
// 是目錄
if (Files.isDirectory(path)) {
Files.createDirectory(Paths.get(targetName));
}
// 是普通文件
else if (Files.isRegularFile(path)) {
Files.copy(path, Paths.get(targetName));
}
} catch (IOException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(end - start);
4. 網(wǎng)絡(luò)編程
4.1 非阻塞 vs 阻塞
阻塞
- 阻塞模式下,相關(guān)方法都會導(dǎo)致線程暫停
- ServerSocketChannel.accept 會在沒有連接建立時讓線程暫停
- SocketChannel.read 會在沒有數(shù)據(jù)可讀時讓線程暫停
- 阻塞的表現(xiàn)其實就是線程暫停了,暫停期間不會占用 cpu,但線程相當(dāng)于閑置
- 單線程下,阻塞方法之間相互影響,幾乎不能正常工作,需要多線程支持
- 但多線程下,有新的問題,體現(xiàn)在以下方面
- 32 位 jvm 一個線程 320k,64 位 jvm 一個線程 1024k,如果連接數(shù)過多,必然導(dǎo)致 OOM,并且線程太多,反而會因為頻繁上下文切換導(dǎo)致性能降低
- 可以采用線程池技術(shù)來減少線程數(shù)和線程上下文切換,但治標(biāo)不治本,如果有很多連接建立,但長時間 inactive,會阻塞線程池中所有線程,因此不適合長連接,只適合短連接
服務(wù)器端
// 使用 nio 來理解阻塞模式, 單線程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 創(chuàng)建了服務(wù)器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 2. 綁定監(jiān)聽端口
ssc.bind(new InetSocketAddress(8080));
// 3. 連接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信
log.debug("connecting...");
SocketChannel sc = ssc.accept(); // 阻塞方法,線程停止運行
log.debug("connected... {}", sc);
channels.add(sc);
for (SocketChannel channel : channels) {
// 5. 接收客戶端發(fā)送的數(shù)據(jù)
log.debug("before read... {}", channel);
channel.read(buffer); // 阻塞方法,線程停止運行
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read...{}", channel);
}
}
客戶端
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting...");
非阻塞
- 非阻塞模式下,相關(guān)方法都會不會讓線程暫停
- 在 ServerSocketChannel.accept 在沒有連接建立時,會返回 null,繼續(xù)運行
- SocketChannel.read 在沒有數(shù)據(jù)可讀時,會返回 0,但線程不必阻塞,可以去執(zhí)行其它 SocketChannel 的 read 或是去執(zhí)行 ServerSocketChannel.accept
- 寫數(shù)據(jù)時,線程只是等待數(shù)據(jù)寫入 Channel 即可,無需等 Channel 通過網(wǎng)絡(luò)把數(shù)據(jù)發(fā)送出去
- 但非阻塞模式下,即使沒有連接建立,和可讀數(shù)據(jù),線程仍然在不斷運行,白白浪費了 cpu
- 數(shù)據(jù)復(fù)制過程中,線程實際還是阻塞的(AIO 改進的地方)
服務(wù)器端,客戶端代碼不變
// 使用 nio 來理解非阻塞模式, 單線程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 創(chuàng)建了服務(wù)器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 綁定監(jiān)聽端口
ssc.bind(new InetSocketAddress(8080));
// 3. 連接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信
SocketChannel sc = ssc.accept(); // 非阻塞,線程還會繼續(xù)運行,如果沒有連接建立,但sc是null
if (sc != null) {
log.debug("connected... {}", sc);
sc.configureBlocking(false); // 非阻塞模式
channels.add(sc);
}
for (SocketChannel channel : channels) {
// 5. 接收客戶端發(fā)送的數(shù)據(jù)
int read = channel.read(buffer);// 非阻塞,線程仍然會繼續(xù)運行,如果沒有讀到數(shù)據(jù),read 返回 0
if (read > 0) {
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read...{}", channel);
}
}
}
多路復(fù)用
單線程可以配合 Selector 完成對多個 Channel 可讀寫事件的監(jiān)控,這稱之為多路復(fù)用
- 多路復(fù)用僅針對網(wǎng)絡(luò) IO、普通文件 IO 沒法利用多路復(fù)用
- 如果不用 Selector 的非阻塞模式,線程大部分時間都在做無用功,而 Selector 能夠保證
- 有可連接事件時才去連接
- 有可讀事件才去讀取
- 有可寫事件才去寫入
- 限于網(wǎng)絡(luò)傳輸能力,Channel 未必時時可寫,一旦 Channel 可寫,會觸發(fā) Selector 的可寫事件
4.2 Selector
好處
- 一個線程配合 selector 就可以監(jiān)控多個 channel 的事件,事件發(fā)生線程才去處理。避免非阻塞模式下所做無用功
- 讓這個線程能夠被充分利用
- 節(jié)約了線程的數(shù)量
- 減少了線程上下文切換
創(chuàng)建
Selector selector = Selector.open();
綁定 Channel 事件
也稱之為注冊事件,綁定的事件 selector 才會關(guān)心
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 綁定事件);
- channel 必須工作在非阻塞模式
- FileChannel 沒有非阻塞模式,因此不能配合 selector 一起使用
- 綁定的事件類型可以有
- connect - 客戶端連接成功時觸發(fā)
- accept - 服務(wù)器端成功接受連接時觸發(fā)
- read - 數(shù)據(jù)可讀入時觸發(fā),有因為接收能力弱,數(shù)據(jù)暫不能讀入的情況
- write - 數(shù)據(jù)可寫出時觸發(fā),有因為發(fā)送能力弱,數(shù)據(jù)暫不能寫出的情況
監(jiān)聽 Channel 事件
可以通過下面三種方法來監(jiān)聽是否有事件發(fā)生,方法的返回值代表有多少 channel 發(fā)生了事件
方法1,阻塞直到綁定事件發(fā)生
int count = selector.select();
方法2,阻塞直到綁定事件發(fā)生,或是超時(時間單位為 ms)
int count = selector.select(long timeout);
方法3,不會阻塞,也就是不管有沒有事件,立刻返回,自己根據(jù)返回值檢查是否有事件
int count = selector.selectNow();
?? select 何時不阻塞
- 事件發(fā)生時
- 客戶端發(fā)起連接請求,會觸發(fā) accept 事件
- 客戶端發(fā)送數(shù)據(jù)過來,客戶端正常、異常關(guān)閉時,都會觸發(fā) read 事件,另外如果發(fā)送的數(shù)據(jù)大于 buffer 緩沖區(qū),會觸發(fā)多次讀取事件
- channel 可寫,會觸發(fā) write 事件
- 在 linux 下 nio bug 發(fā)生時
- 調(diào)用 selector.wakeup()
- 調(diào)用 selector.close()
- selector 所在線程 interrupt
4.3 處理 accept 事件
客戶端代碼為
public class Client {
public static void main(String[] args) {
try (Socket socket = new Socket("localhost", 8080)) {
System.out.println(socket);
socket.getOutputStream().write("world".getBytes());
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
服務(wù)器端代碼為
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
// int count = selector.selectNow();
log.debug("select count: {}", count);
// if(count <= 0) {
// continue;
// }
// 獲取所有事件
Set<SelectionKey> keys = selector.selectedKeys();
// 遍歷所有事件,逐一處理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判斷事件類型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必須處理
SocketChannel sc = c.accept();
log.debug("{}", sc);
}
// 處理完畢,必須將事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
?? 事件發(fā)生后能否不處理
事件發(fā)生后,要么處理,要么取消(cancel),不能什么都不做,否則下次該事件仍會觸發(fā),這是因為 nio 底層使用的是水平觸發(fā)
4.4 處理 read 事件
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
// int count = selector.selectNow();
log.debug("select count: {}", count);
// if(count <= 0) {
// continue;
// }
// 獲取所有事件
Set<SelectionKey> keys = selector.selectedKeys();
// 遍歷所有事件,逐一處理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判斷事件類型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必須處理
SocketChannel sc = c.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
log.debug("連接已建立: {}", sc);
} else if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
if(read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
debug(buffer);
}
}
// 處理完畢,必須將事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
開啟兩個客戶端,修改一下發(fā)送文字,輸出
sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - 連接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60367]
21:16:39 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - 連接已建立: java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60378]
21:16:59 [DEBUG] [main] c.i.n.ChannelDemo6 - select count: 1
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 6f 72 6c 64 |world |
+--------+-------------------------------------------------+----------------+
?? 為何要 iter.remove()
因為 select 在事件發(fā)生后,就會將相關(guān)的 key 放入 selectedKeys 集合,但不會在處理完后從 selectedKeys 集合中移除,需要我們自己編碼刪除。例如
- 第一次觸發(fā)了 ssckey 上的 accept 事件,沒有移除 ssckey
- 第二次觸發(fā)了 sckey 上的 read 事件,但這時 selectedKeys 中還有上次的 ssckey ,在處理時因為沒有真正的 serverSocket 連上了,就會導(dǎo)致空指針異常
?? cancel 的作用
cancel 會取消注冊在 selector 上的 channel,并從 keys 集合中刪除 key 后續(xù)不會再監(jiān)聽事件
?? 不處理邊界的問題
以前有同學(xué)寫過這樣的代碼,思考注釋中兩個問題,以 bio 為例,其實 nio 道理是一樣的
public class Server {
public static void main(String[] args) throws IOException {
ServerSocket ss=new ServerSocket(9000);
while (true) {
Socket s = ss.accept();
InputStream in = s.getInputStream();
// 這里這么寫,有沒有問題
byte[] arr = new byte[4];
while(true) {
int read = in.read(arr);
// 這里這么寫,有沒有問題
if(read == -1) {
break;
}
System.out.println(new String(arr, 0, read));
}
}
}
}
客戶端
public class Client {
public static void main(String[] args) throws IOException {
Socket max = new Socket("localhost", 9000);
OutputStream out = max.getOutputStream();
out.write("hello".getBytes());
out.write("world".getBytes());
out.write("你好".getBytes());
max.close();
}
}
輸出
hell
owor
ld?
?好
為什么?
處理消息的邊界
- 一種思路是固定消息長度,數(shù)據(jù)包大小一樣,服務(wù)器按預(yù)定長度讀取,缺點是浪費帶寬
- 另一種思路是按分隔符拆分,缺點是效率低
- TLV 格式,即 Type 類型、Length 長度、Value 數(shù)據(jù),類型和長度已知的情況下,就可以方便獲取消息大小,分配合適的 buffer,缺點是 buffer 需要提前分配,如果內(nèi)容過大,則影響 server 吞吐量
- Http 1.1 是 TLV 格式
- Http 2.0 是 LTV 格式
服務(wù)器端
private static void split(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
// 找到一條完整消息
if (source.get(i) == '\n') {
int length = i + 1 - source.position();
// 把這條完整消息存入新的 ByteBuffer
ByteBuffer target = ByteBuffer.allocate(length);
// 從 source 讀,向 target 寫
for (int j = 0; j < length; j++) {
target.put(source.get());
}
debugAll(target);
}
}
source.compact(); // 0123456789abcdef position 16 limit 16
}
public static void main(String[] args) throws IOException {
// 1. 創(chuàng)建 selector, 管理多個 channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 2. 建立 selector 和 channel 的聯(lián)系(注冊)
// SelectionKey 就是將來事件發(fā)生后,通過它可以知道事件和哪個channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只關(guān)注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("sscKey:{}", sscKey);
ssc.bind(new InetSocketAddress(8080));
while (true) {
// 3. select 方法, 沒有事件發(fā)生,線程阻塞,有事件,線程才會恢復(fù)運行
// select 在事件未處理時,它不會阻塞, 事件發(fā)生后要么處理,要么取消,不能置之不理
selector.select();
// 4. 處理事件, selectedKeys 內(nèi)部包含了所有發(fā)生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 處理key 時,要從 selectedKeys 集合中刪除,否則下次處理就會有問題
iter.remove();
log.debug("key: {}", key);
// 5. 區(qū)分事件類型
if (key.isAcceptable()) { // 如果是 accept
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
// 將一個 byteBuffer 作為附件關(guān)聯(lián)到 selectionKey 上
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
log.debug("scKey:{}", scKey);
} else if (key.isReadable()) { // 如果是 read
try {
SocketChannel channel = (SocketChannel) key.channel(); // 拿到觸發(fā)事件的channel
// 獲取 selectionKey 上關(guān)聯(lián)的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = channel.read(buffer); // 如果是正常斷開,read 的方法的返回值是 -1
if(read == -1) {
key.cancel();
} else {
split(buffer);
// 需要擴容
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer); // 0123456789abcdef3333\n
key.attach(newBuffer);
}
}
} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因為客戶端斷開了,因此需要將 key 取消(從 selector 的 keys 集合中真正刪除 key)
}
}
}
}
}
客戶端
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress address = sc.getLocalAddress();
// sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
System.in.read();
ByteBuffer 大小分配
- 每個 channel 都需要記錄可能被切分的消息,因為 ByteBuffer 不能被多個 channel 共同使用,因此需要為每個 channel 維護一個獨立的 ByteBuffer
- ByteBuffer 不能太大,比如一個 ByteBuffer 1Mb 的話,要支持百萬連接就要 1Tb 內(nèi)存,因此需要設(shè)計大小可變的 ByteBuffer
- 一種思路是首先分配一個較小的 buffer,例如 4k,如果發(fā)現(xiàn)數(shù)據(jù)不夠,再分配 8k 的 buffer,將 4k buffer 內(nèi)容拷貝至 8k buffer,優(yōu)點是消息連續(xù)容易處理,缺點是數(shù)據(jù)拷貝耗費性能,參考實現(xiàn) http://tutorials.jenkov.com/java-performance/resizable-array.html
- 另一種思路是用多個數(shù)組組成 buffer,一個數(shù)組不夠,把多出來的內(nèi)容寫入新的數(shù)組,與前面的區(qū)別是消息存儲不連續(xù)解析復(fù)雜,優(yōu)點是避免了拷貝引起的性能損耗
4.5 處理 write 事件
一次無法寫完例子
- 非阻塞模式下,無法保證把 buffer 中所有數(shù)據(jù)都寫入 channel,因此需要追蹤 write 方法的返回值(代表實際寫入字節(jié)數(shù))
- 用 selector 監(jiān)聽所有 channel 的可寫事件,每個 channel 都需要一個 key 來跟蹤 buffer,但這樣又會導(dǎo)致占用內(nèi)存過多,就有兩階段策略
- 當(dāng)消息處理器第一次寫入消息時,才將 channel 注冊到 selector 上
- selector 檢查 channel 上的可寫事件,如果所有的數(shù)據(jù)寫完了,就取消 channel 的注冊
- 如果不取消,會每次可寫均會觸發(fā) write 事件
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
// 1. 向客戶端發(fā)送內(nèi)容
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
int write = sc.write(buffer);
// 3. write 表示實際寫了多少字節(jié)
System.out.println("實際寫入字節(jié):" + write);
// 4. 如果有剩余未讀字節(jié),才需要關(guān)注寫事件
if (buffer.hasRemaining()) {
// read 1 write 4
// 在原有關(guān)注事件的基礎(chǔ)上,多關(guān)注 寫事件
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
// 把 buffer 作為附件加入 sckey
sckey.attach(buffer);
}
} else if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println("實際寫入字節(jié):" + write);
if (!buffer.hasRemaining()) { // 寫完了
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
}
}
}
}
客戶端
public class WriteClient {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
sc.connect(new InetSocketAddress("localhost", 8080));
int count = 0;
while (true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isConnectable()) {
System.out.println(sc.finishConnect());
} else if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
buffer.clear();
System.out.println(count);
}
}
}
}
}
?? write 為何要取消
只要向 channel 發(fā)送數(shù)據(jù)時,socket 緩沖可寫,這個事件會頻繁觸發(fā),因此應(yīng)當(dāng)只在 socket 緩沖區(qū)寫不下時再關(guān)注可寫事件,數(shù)據(jù)寫完之后再取消關(guān)注
4.6 更進一步
?? 利用多線程優(yōu)化
現(xiàn)在都是多核 cpu,設(shè)計時要充分考慮別讓 cpu 的力量被白白浪費
前面的代碼只有一個選擇器,沒有充分利用多核 cpu,如何改進呢?
分兩組選擇器
- 單線程配一個選擇器,專門處理 accept 事件
- 創(chuàng)建 cpu 核心數(shù)的線程,每個線程配一個選擇器,輪流處理 read 事件
public class ChannelDemo7 {
public static void main(String[] args) throws IOException {
new BossEventLoop().register();
}
@Slf4j
static class BossEventLoop implements Runnable {
private Selector boss;
private WorkerEventLoop[] workers;
private volatile boolean start = false;
AtomicInteger index = new AtomicInteger();
public void register() throws IOException {
if (!start) {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
boss = Selector.open();
SelectionKey ssckey = ssc.register(boss, 0, null);
ssckey.interestOps(SelectionKey.OP_ACCEPT);
workers = initEventLoops();
new Thread(this, "boss").start();
log.debug("boss start...");
start = true;
}
}
public WorkerEventLoop[] initEventLoops() {
// EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];
WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];
for (int i = 0; i < workerEventLoops.length; i++) {
workerEventLoops[i] = new WorkerEventLoop(i);
}
return workerEventLoops;
}
@Override
public void run() {
while (true) {
try {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
SocketChannel sc = c.accept();
sc.configureBlocking(false);
log.debug("{} connected", sc.getRemoteAddress());
workers[index.getAndIncrement() % workers.length].register(sc);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@Slf4j
static class WorkerEventLoop implements Runnable {
private Selector worker;
private volatile boolean start = false;
private int index;
private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();
public WorkerEventLoop(int index) {
this.index = index;
}
public void register(SocketChannel sc) throws IOException {
if (!start) {
worker = Selector.open();
new Thread(this, "worker-" + index).start();
start = true;
}
tasks.add(() -> {
try {
SelectionKey sckey = sc.register(worker, 0, null);
sckey.interestOps(SelectionKey.OP_READ);
worker.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
});
worker.wakeup();
}
@Override
public void run() {
while (true) {
try {
worker.select();
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
Set<SelectionKey> keys = worker.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
try {
int read = sc.read(buffer);
if (read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
log.debug("{} message:", sc.getRemoteAddress());
debugAll(buffer);
}
} catch (IOException e) {
e.printStackTrace();
key.cancel();
sc.close();
}
}
iter.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
?? 如何拿到 cpu 個數(shù)
- Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因為容器不是物理隔離的,會拿到物理 cpu 個數(shù),而不是容器申請時的個數(shù)
- 這個問題直到 jdk 10 才修復(fù),使用 jvm 參數(shù) UseContainerSupport 配置, 默認(rèn)開啟
4.7 UDP
- UDP 是無連接的,client 發(fā)送數(shù)據(jù)不會管 server 是否開啟
- server 這邊的 receive 方法會將接收到的數(shù)據(jù)存入 byte buffer,但如果數(shù)據(jù)報文超過 buffer 大小,多出來的數(shù)據(jù)會被默默拋棄
首先啟動服務(wù)器端
public class UdpServer {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
channel.socket().bind(new InetSocketAddress(9999));
System.out.println("waiting...");
ByteBuffer buffer = ByteBuffer.allocate(32);
channel.receive(buffer);
buffer.flip();
debug(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
輸出
waiting...
運行客戶端文章來源:http://www.zghlxwxcb.cn/news/detail-432170.html
public class UdpClient {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
InetSocketAddress address = new InetSocketAddress("localhost", 9999);
channel.send(buffer, address);
} catch (Exception e) {
e.printStackTrace();
}
}
}
接下來服務(wù)器端輸出文章來源地址http://www.zghlxwxcb.cn/news/detail-432170.html
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
到了這里,關(guān)于NIO基礎(chǔ) - 網(wǎng)絡(luò)編程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!