創(chuàng)建阻塞的服務器
當 ServerSocketChannel
與 SockelChannel
采用默認的阻塞模式時,為了同時處理多個客戶的連接,必須使用多線程
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private ExecutorService executorService; //線程池
private static final int POOL_MULTIPLE = 4; //線程池中工作線程的數(shù)目
public EchoServer() throws IOException {
//創(chuàng)建一個線程池
executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE);
//創(chuàng)建一個ServerSocketChannel對象
serverSocketChannel = ServerSocketChannel.open();
//使得在同一個主機上關閉了服務器程序,緊接著再啟動該服務器程序時,可以順利綁定相同的端口
serverSocketChannel.socket().setReuseAddress(true);
//把服務器進程與一個本地端口綁定
serverSocketChannel.socket().bind(new InetSocketAddress(port));
System.out.println("服務器啟動");
}
public void service() {
while (true) {
SocketChannel socketChannel = null;
try {
socketChannel = serverSocketChannel.accept();
//處理客戶連接
executorService.execute(new Handler(socketChannel));
} catch(IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[])throws IOException {
new EchoServer().service();
}
//處理客戶連按
class Handler implements Runnable {
private SocketChannel socketChannel;
public Handler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public void run() {
handle(socketChannel);
}
public void handle(SocketChannel socketChannel) {
try {
//獲得與socketChannel關聯(lián)的Socket對象
Socket socket = socketChannel.socket();
System.out.println("接收到客戶連接,來自:" + socket.getInetAddress() + ":" + socket.getPort());
BufferedReader br = getReader(socket);
PrintWriter pw = getWriter(socket);
String msg = null;
while ((msg = br.readLine()) != null) {
System.out.println(msg);
pw.println(echo(msg));
if (msg.equals("bye")) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(socketChannel != null) {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
private PrintWriter getWriter(Socket socket) throws IOException {
OutputStream socketOut = socket.getOutputStream();
return new PrintWriter(socketOut,true);
}
private BufferedReader getReader(Socket socket) throws IOException {
InputStream socketIn = socket.getInputStream();
return new BufferedReader(new InputStreamReader(socketIn));
}
public String echo(String msg) {
return "echo:" + msg;
}
}
創(chuàng)建非阻塞的服務器
在非阻塞模式下,EchoServer
只需要啟動一個主線程,就能同時處理三件事:
- 接收客戶的連接
- 接收客戶發(fā)送的數(shù)據(jù)
- 向客戶發(fā)回響應數(shù)據(jù)
EchoServer
委托 Selector
來負責監(jiān)控接收連接就緒事件、讀就緒事件和寫就緒事件如果有特定事件發(fā)生,就處理該事件
// 創(chuàng)建一個Selector對象
selector = Selector.open();
//創(chuàng)建一個ServerSocketChannel對象
serverSocketChannel = ServerSocketChannel.open();
//使得在同一個主機上關閉了服務器程序,緊接著再啟動該服務器程序時
//可以順利綁定到相同的端口
serverSocketChannel.socket().setReuseAddress(true);
//使ServerSocketChannel工作于非阻塞模式
serverSocketChannel.configureBlocking(false):
//把服務器進程與一個本地端口綁定
serverSocketChannelsocket().bind(new InetSocketAddress(port));
EchoServer
類的 service()
方法負責處理本節(jié)開頭所說的三件事,體現(xiàn)其主要流程的代碼如下:
public void service() throws IOException {
serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
//第1層while循環(huán)
while(selector.select() > 0) {
//獲得Selector的selected-keys集合
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
//第2層while循環(huán)
while (it.hasNext()) {
SelectionKey key = null;
//處理SelectionKey
try {
//取出一個SelectionKey
key = (SelectionKey) it.next();
//把 SelectionKey從Selector 的selected-key 集合中刪除
it.remove();
1f (key.isAcceptable()) { 處理接收連接就緒事件; }
if (key.isReadable()) { 處理讀就緒水件; }
if (key.isWritable()) { 處理寫就緒事件; }
} catch(IOException e) {
e.printStackTrace();
try {
if(key != null) {
//使這個SelectionKey失效
key.cancel();
//關閉與這個SelectionKey關聯(lián)的SocketChannel
key.channel().close();
}
} catch(Exception ex) {
e.printStackTrace();
}
}
}
}
}
- 首先由
ServerSocketChannel
向Selector
注冊接收連接就緒事件,如果Selector
監(jiān)控到該事件發(fā)生,就會把相應的SelectionKey
對象加入selected-keys
集合 - 第一層 while 循環(huán),不斷詢問
Selector
已經(jīng)發(fā)生的事件,select()
方法返回當前相關事件已經(jīng)發(fā)生的SelectionKey
的個數(shù),如果當前沒有任何事件發(fā)生,該方法會阻塞下去,直到至少有一個事件發(fā)生。Selector
的selectedKeys()
方法返回selected-keys
集合,它存放了相關事件已經(jīng)發(fā)生的SelectionKey
對象 - 第二層 while 循環(huán),從
selected-keys
集合中依次取出每個SelectionKey
對象并從集合中刪除,,然后調(diào)用isAcceptable()
、isReadable()
和isWritable()
方法判斷到底是哪種事件發(fā)生了,從而做出相應的處理
1. 處理接收連接就緒事件
if (key.isAcceptable()) {
//獲得與SelectionKey關聯(lián)的ServerSocketChannel
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//獲得與客戶連接的SocketChannel
SocketChannel socketChannel = (SocketChannel) ssc.accept();
//把Socketchannel設置為非阻塞模式
socketChannel.configureBlocking(false);
//創(chuàng)建一個用于存放用戶發(fā)送來的數(shù)據(jù)的級沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
//Socketchannel向Selector注冊讀就緒事件和寫就緒事件
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
2. 處理讀就緒事件
public void receive(SelectionKey key) throws IOException {
//獲得與SelectionKey關聯(lián)的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
//獲得與SelectionKey關聯(lián)的Socketchannel
SocketChannel socketChannel = (SocketChannel)key.channel();
//創(chuàng)建一個ByteBuffer用于存放讀到的數(shù)據(jù)
ByteBuffer readBuff = ByteBuffer.allocate(32);
socketChannel.read(readBuff);
readBuff.flip();
//把buffer的極限設為容量
buffer.limit(buffer.capacity());
//把readBuff中的內(nèi)容拷貝到buffer
buffer.put(readBuff);
}
3. 處理寫就緒事件
public void send(SelectionKey key) throws IOException {
//獲得與SelectionKey關聯(lián)的ByteBuffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
//獲得與SelectionKey關聯(lián)的SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.flip();
//按照GBK編碼把buffer中的字節(jié)轉(zhuǎn)換為字符串
String data = decode(buffer);
//如果還沒有讀到一行數(shù)據(jù)就返回
if(data.indexOf("\r\n") == -1)
return;
//截取一行數(shù)據(jù)
String outputData = data.substring(0, data.indexOf("\n") + 1);
//把輸出的字符串按照GBK編碼轉(zhuǎn)換為字節(jié),把它放在outputBuffer中
ByteBuffer outputBuffer = encode("echo:" + outputData);
//輸出outputBuffer的所有字節(jié)
while(outputBuffer,hasRemaining())
socketChannel.write(outputBuffer);
//把outputData字符審按照GBK編碼,轉(zhuǎn)換為字節(jié),把它放在ByteBuffer
ByteBuffer temp = encode(outputData);
//把buffer的位置設為temp的極限
buffer.position(temp.limit()):
//刪除buffer已經(jīng)處理的數(shù)據(jù)
buffer.compact();
//如果已經(jīng)輸出了字符串“bye\r\n”,就使SelectionKey失效,并關閉SocketChannel
if(outputData.equals("bye\r\n")) {
key.cancel();
socketChannel.close();
}
}
完整代碼如下:
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private Selector selector;
private Charset charset = Charset.forName("GBK");
public EchoServer() throws IOException {
// 創(chuàng)建一個Selector對象
selector = Selector.open();
//創(chuàng)建一個ServerSocketChannel對象
serverSocketChannel = ServerSocketChannel.open();
//使得在同一個主機上關閉了服務器程序,緊接著再啟動該服務器程序時
//可以順利綁定到相同的端口
serverSocketChannel.socket().setReuseAddress(true);
//使ServerSocketChannel工作于非阻塞模式
serverSocketChannel.configureBlocking(false):
//把服務器進程與一個本地端口綁定
serverSocketChannelsocket().bind(new InetSocketAddress(port));
}
public void service() throws IOException {
serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
//第1層while循環(huán)
while(selector.select() > 0) {
//獲得Selector的selected-keys集合
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
//第2層while循環(huán)
while (it.hasNext()) {
SelectionKey key = null;
//處理SelectionKey
try {
//取出一個SelectionKey
key = (SelectionKey) it.next();
//把 SelectionKey從Selector 的selected-key 集合中刪除
it.remove();
1f (key.isAcceptable()) {
//獲得與SelectionKey關聯(lián)的ServerSocketChannel
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//獲得與客戶連接的SocketChannel
SocketChannel socketChannel = (SocketChannel) ssc.accept();
//把Socketchannel設置為非阻塞模式
socketChannel.configureBlocking(false);
//創(chuàng)建一個用于存放用戶發(fā)送來的數(shù)據(jù)的級沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
//Socketchannel向Selector注冊讀就緒事件和寫就緒事件
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
if (key.isReadable()) { receive(key); }
if (key.isWritable()) { send(key); }
} catch(IOException e) {
e.printStackTrace();
try {
if(key != null) {
//使這個SelectionKey失效
key.cancel();
//關閉與這個SelectionKey關聯(lián)的SocketChannel
key.channel().close();
}
} catch(Exception ex) {
e.printStackTrace();
}
}
}
}
}
public void receive(SelectionKey key) throws IOException {
//獲得與SelectionKey關聯(lián)的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
//獲得與SelectionKey關聯(lián)的Socketchannel
SocketChannel socketChannel = (SocketChannel)key.channel();
//創(chuàng)建一個ByteBuffer用于存放讀到的數(shù)據(jù)
ByteBuffer readBuff = ByteBuffer.allocate(32);
socketChannel.read(readBuff);
readBuff.flip();
//把buffer的極限設為容量
buffer.limit(buffer.capacity());
//把readBuff中的內(nèi)容拷貝到buffer
buffer.put(readBuff);
}
public void send(SelectionKey key) throws IOException {
//獲得與SelectionKey關聯(lián)的ByteBuffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
//獲得與SelectionKey關聯(lián)的SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.flip();
//按照GBK編碼把buffer中的字節(jié)轉(zhuǎn)換為字符串
String data = decode(buffer);
//如果還沒有讀到一行數(shù)據(jù)就返回
if(data.indexOf("\r\n") == -1)
return;
//截取一行數(shù)據(jù)
String outputData = data.substring(0, data.indexOf("\n") + 1);
//把輸出的字符串按照GBK編碼轉(zhuǎn)換為字節(jié),把它放在outputBuffer中
ByteBuffer outputBuffer = encode("echo:" + outputData);
//輸出outputBuffer的所有字節(jié)
while(outputBuffer,hasRemaining())
socketChannel.write(outputBuffer);
//把outputData字符審按照GBK編碼,轉(zhuǎn)換為字節(jié),把它放在ByteBuffer
ByteBuffer temp = encode(outputData);
//把buffer的位置設為temp的極限
buffer.position(temp.limit()):
//刪除buffer已經(jīng)處理的數(shù)據(jù)
buffer.compact();
//如果已經(jīng)輸出了字符串“bye\r\n”,就使SelectionKey失效,并關閉SocketChannel
if(outputData.equals("bye\r\n")) {
key.cancel();
socketChannel.close();
}
}
//解碼
public String decode(ByteBuffer buffer) {
CharBuffer charBuffer = charset.decode(buffer);
return charBuffer.toStrinq();
}
//編碼
public ByteBuffer encode(String str) {
return charset.encode(str);
}
public static void main(String args[])throws Exception {
EchoServer server = new EchoServer();
server.service();
}
}
阻塞模式與非阻塞模式混合使用
使用非阻塞模式時,ServerSocketChannel
以及 SocketChannel
都被設置為非阻塞模式,這使得接收連接、接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的操作都采用非阻塞模式,EchoServer
采用一個線程同時完成這些操作
假如有許多客戶請求連接,可以把接收客戶連接的操作單獨由一個線程完成,把接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的操作由另一個線程完成,這可以提高服務器的并發(fā)性能
負責接收客戶連接的線程按照阻塞模式工作,如果收到客戶連接,就向 Selector
注冊讀就緒和寫就緒事件,否則進入阻塞狀態(tài),直到接收到了客戶的連接。負責接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的線程按照非阻塞模式工作,只有在讀就緒或?qū)懢途w事件發(fā)生時,才執(zhí)行相應的接收數(shù)據(jù)和發(fā)送數(shù)據(jù)操作
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private Selector selector = null;
private Charset charset = Charset.forName("GBK");
public EchoServer() throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannelsocket().bind(new InetSocketAddress(port));
}
public void accept() {
while(true) {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
synchronized(gate) {
selector.wakeup();
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
} catch(IOException e) {
e.printStackTrace();
}
}
}
private Object gate=new Object();
public void service() throws IOException {
while(true) {
synchronized(gate){}
int n = selector.select();
if(n == 0) continue;
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
while (it.hasNext()) {
SelectionKey key = null;
try {
it.remove();
if (key.isReadable()) {
receive(key);
}
if (key.isWritable()) {
send(key);
}
} catch(IOException e) {
e.printStackTrace();
try {
if(key != null) {
key.cancel();
key.channel().close();
}
} catch(Exception ex) { e.printStackTrace(); }
}
}
}
}
public void receive(SelectionKey key) throws IOException {
...
}
public void send(SelectionKey key) throws IOException {
...
}
public String decode(ByteBuffer buffer) {
...
}
public ByteBuffer encode(String str) {
...
}
public static void main(String args[])throws Exception {
final EchoServer server = new EchoServer();
Thread accept = new Thread() {
public void run() {
server.accept();
}
};
accept.start();
server.service();
}
}
注意一點:主線程的 selector select()
方法和 Accept 線程的 register(...)
方法都會造成阻塞,因為他們都會操作 Selector
對象的共享資源 all-keys
集合,這有可能會導致死鎖
導致死鎖的具體情形是:Selector
中尚沒有任何注冊的事件,即 all-keys
集合為空,主線程執(zhí)行 selector.select()
方法時將進入阻塞狀態(tài),只有當 Accept 線程向 Selector
注冊了事件,并且該事件發(fā)生后,主線程才會從 selector.select()
方法返回。然而,由于主線程正在 selector.select()
方法中阻塞,這使得 Acccept
線程也在 register()
方法中阻塞。Accept 線程無法向 Selector 注冊事件,而主線程沒有任何事件可以監(jiān)控,所以這兩個線程將永遠阻塞下去文章來源:http://www.zghlxwxcb.cn/news/detail-451389.html
為了避免對共享資源的競爭,同步機制使得一個線程執(zhí)行 register()
時,不允許另一個線程同時執(zhí)行 select()
方法,反之亦然文章來源地址http://www.zghlxwxcb.cn/news/detail-451389.html
到了這里,關于Java 網(wǎng)絡編程 —— 實現(xiàn)非阻塞式的服務器的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!