Linux高性能服務器編程
本文是讀書筆記,如有侵權,請聯(lián)系刪除。
參考
Linux高性能服務器編程源碼: https://github.com/raichen/LinuxServerCodes
豆瓣: Linux高性能服務器編程
第09章I/O復用
I/O復用使得程序能同時監(jiān)聽多個文件描述符,這對提高程序的性能至關重要。通常,網(wǎng)絡程序在下列情況下需要使用I/0復用技術:
客戶端程序要同時處理多個socket。比如本章將要討論的非阻塞connect技術。
客戶端程序要同時處理用戶輸入和網(wǎng)絡連接。比如本章將要討論的聊天室程序。
TCP服務器要同時處理監(jiān)聽socket和連接socket。這是I/O復用使用最多的場合。后 續(xù)章節(jié)將展示很多這方面的例子。
服務器要同時處理TCP請求和UDP請求。比如本章將要討論的回射服務器。
服務器要同時監(jiān)聽多個端口,或者處理多種服務。比如本章將要討論的xinetd 服務器。
需要指出的是,I/O復用雖然能同時監(jiān)聽多個文件描述符,但它本身是阻塞的。并且當多個文件描述符同時就緒時,如果不采取額外的措施,程序就只能按順序依次處理其中的每 一個文件描述符,這使得服務器程序看起來像是串行工作的。如果要實現(xiàn)并發(fā),只能使用多進程或多線程等編程手段。Linux下實現(xiàn)I/O復用的系統(tǒng)調用主要有select、poll和epoll,本章將依次討論之,然后介紹使用它們的幾個實例。
9.1 select系統(tǒng)調用
select系統(tǒng)調用的用途是:在一段指定時間內,監(jiān)聽用戶感興趣的文件描述符上的可讀、可寫和異常等事件。本節(jié)先介紹select系統(tǒng)調用的API,然后討論 select判斷文件描述符就緒的條件,最后給出它在處理帶外數(shù)據(jù)中的實際應用。
9.1.1 select API
select系統(tǒng)調用的原型如下:
#include <sys/select.h>
int select( int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds,
struct timeval* timeout );
1)nfds參數(shù)指定被監(jiān)聽的文件描述符的總數(shù)。它通常被設置為select 監(jiān)聽的所有文件描述符中的最大值加1,因為文件描述符是從0開始計數(shù)的。
2)readfds、writefds和exceptfds參數(shù)分別指向可讀、可寫和異常等事件對應的文件描述符集合。應用程序調用select函數(shù)時,通過這3個參數(shù)傳入自己感興趣的文件描述符。select調用返回時,內核將修改它們來通知應用程序哪些文件描述符已經(jīng)就緒。這3個參數(shù)是fd_set結構指針類型。fd_set結構體的定義如下:
由以上定義可見,fd_set結構體僅包含一個整型數(shù)組,該數(shù)組的每個元素的每一位(bit) 標記一個文件描述符。fd_set能容納的文件描述符數(shù)量由FD_SETSIZE指定,這就限制了 select 能同時處理的文件描述符的總量。
由于位操作過于煩瑣,我們應該使用下面的一系列宏來訪問fd_set結構體中的位:
3)timeout參數(shù)用來設置select函數(shù)的超時時間。它是一個timeval結構類型的指針,采 用指針參數(shù)是因為內核將修改它以告訴應用程序select等待了多久。不過我們不能完全信任 select調用返回后的timeout值,比如調用失敗時timeout值是不確定的。
timeval結構體的定 義如下:
struct timeval
{
1ong tv_sec; /*秒數(shù)*/
long tv_usec; /*微秒數(shù)*/
};
由以上定義可見,select給我們提供了一個微秒級的定時方式。如果給timeout變量的 tv_sec成員和tv_usec成員都傳遞0,則select將立即返回。如果給timeout傳遞NULL,則 select將一直阻塞,直到某個文件描述符就緒。select 成功時返回就緒(可讀、可寫和異常)文件描述符的總數(shù)。如果在超時時間內沒 有任何文件描述符就緒,select將返回0。select失敗時返回-1并設置errno。如果在select 等待期間,程序接收到信號,則select 立即返回-1,并設置 errno為EINTR。
select
函數(shù)用于在一組文件描述符上等待某個事件發(fā)生,可以等待多個文件描述符上的可讀、可寫或異常等事件。該函數(shù)的參數(shù)包括:
-
nfds
:監(jiān)視的文件描述符集合中最大的文件描述符值加1。 -
readfds
、writefds
、exceptfds
:分別是用于監(jiān)視可讀、可寫、異常事件的文件描述符集合。 -
timeout
:是一個指向struct timeval
結構體的指針,用于設置超時時間。如果為NULL
,表示一直等待,直到有事件發(fā)生。
函數(shù)返回時,可以通過檢查文件描述符集合來確定哪些文件描述符上發(fā)生了事件。以下是一個簡單的示例代碼:
#include <sys/select.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
int main() {
// 創(chuàng)建文件描述符集合
fd_set read_fds;
// 初始化文件描述符集合
FD_ZERO(&read_fds);
// 添加標準輸入文件描述符到集合
FD_SET(STDIN_FILENO, &read_fds);
// 設置超時時間為5秒
struct timeval timeout;
timeout.tv_sec = 5;
timeout.tv_usec = 0;
// 調用select等待事件
int result = select(STDIN_FILENO + 1, &read_fds, NULL, NULL, &timeout);
if (result == -1) {
perror("select");
exit(EXIT_FAILURE);
} else if (result == 0) {
printf("No data within 5 seconds.\n");
} else {
if (FD_ISSET(STDIN_FILENO, &read_fds)) {
printf("Data is available on standard input.\n");
// 讀取標準輸入的數(shù)據(jù)
char buffer[1024];
ssize_t bytesRead = read(STDIN_FILENO, buffer, sizeof(buffer));
if (bytesRead > 0) {
printf("Read %zd bytes: %.*s\n", bytesRead, (int)bytesRead, buffer);
} else {
perror("read");
}
}
}
return 0;
}
在上述代碼中,select
用于等待標準輸入上的可讀事件,如果在5秒內有數(shù)據(jù)可讀,則會輸出相應的信息。
9.1.2文件描述符就緒條件
哪些情況下文件描述符可以被認為是可讀、可寫或者出現(xiàn)異常,對于select的使用非常關鍵。
在網(wǎng)絡編程中,下列情況下socket可讀:
-
socket內核接收緩存區(qū)中的字節(jié)數(shù)大于或等于其低水位標記SO_RCVLOWAT。此時 我們可以無阻塞地讀該socket,并且讀操作返回的字節(jié)數(shù)大于0。
-
socket通信的對方關閉連接。此時對該socket的讀操作將返回0。
-
監(jiān)聽 socket 上有新的連接請求。
-
socket 上有未處理的錯誤。此時我們可以使用 getsockopt 來讀取和清除該錯誤。
下列情況下 socket 可寫:
- socket內核發(fā)送緩存區(qū)中的可用字節(jié)數(shù)大于或等于其低水位標記SO_SNDLOWAT。此時我們可以無阻塞地寫該socket,并且寫操作返回的字節(jié)數(shù)大于0。
- socket的寫操作被關閉。對寫操作被關閉的socket執(zhí)行寫操作將觸發(fā)一個SIGPIPE信號。
- socket使用非阻塞connect連接成功或者失?。ǔ瑫r)之后。
- socket 上有未處理的錯誤。此時我們可以使用getsockopt 來讀取和清除該錯誤。
網(wǎng)絡程序中,select能處理的異常情況只有一種:socket上接收到帶外數(shù)據(jù)。下面我們詳細討論之。
9.1.3處理帶外數(shù)據(jù)
上一小節(jié)提到,socket上接收到普通數(shù)據(jù)和帶外數(shù)據(jù)都將使select返回,但socket處于不同的就緒狀態(tài):前者處于可讀狀態(tài),后者處于異常狀態(tài)。代碼清單9-1描述了select是如 何同時處理二者的。
9-1use_select.cpp
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
int main(int argc, char *argv[])
{
// 檢查命令行參數(shù)
if (argc <= 2)
{
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
printf("ip is %s and port is %d\n", ip, port);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
// 創(chuàng)建監(jiān)聽套接字
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
// 綁定地址信息
ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
// 監(jiān)聽連接
ret = listen(listenfd, 5);
assert(ret != -1);
// 接受連接
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0)
{
printf("errno is: %d\n", errno);
close(listenfd);
}
// 打印連接信息
char remote_addr[INET_ADDRSTRLEN];
printf("connected with ip: %s and port: %d\n", inet_ntop(AF_INET, &client_address.sin_addr, remote_addr, INET_ADDRSTRLEN), ntohs(client_address.sin_port));
char buf[1024];
fd_set read_fds;
fd_set exception_fds;
FD_ZERO(&read_fds);
FD_ZERO(&exception_fds);
int nReuseAddr = 1;
setsockopt(connfd, SOL_SOCKET, SO_OOBINLINE, &nReuseAddr, sizeof(nReuseAddr));
while (1)
{
memset(buf, '\0', sizeof(buf));
FD_SET(connfd, &read_fds);
FD_SET(connfd, &exception_fds);
// 監(jiān)視連接套接字上的可讀和異常事件
ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL);
printf("select one\n");
if (ret < 0)
{
printf("selection failure\n");
break;
}
// 處理可讀事件
if (FD_ISSET(connfd, &read_fds))
{
ret = recv(connfd, buf, sizeof(buf) - 1, 0);
if (ret <= 0)
{
break;
}
printf("get %d bytes of normal data: %s\n", ret, buf);
}
// 處理異常事件
else if (FD_ISSET(connfd, &exception_fds))
{
ret = recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);
if (ret <= 0)
{
break;
}
printf("get %d bytes of oob data: %s\n", ret, buf);
}
}
// 關閉連接套接字和監(jiān)聽套接字
close(connfd);
close(listenfd);
return 0;
}
此代碼是一個簡單的TCP服務器程序,監(jiān)聽指定的IP地址和端口,接受客戶端連接后,通過 select
函數(shù)監(jiān)視連接套接字上的可讀和異常事件。當有數(shù)據(jù)可讀時,通過 recv
函數(shù)接收數(shù)據(jù)并打?。划斢袔鈹?shù)據(jù)(OOB)時,同樣通過 recv
函數(shù)接收并打印。
對于帶外數(shù)據(jù)的解釋:
帶外數(shù)據(jù)(Out-of-Band Data,簡稱OOB數(shù)據(jù))是在TCP通信中的一種特殊數(shù)據(jù)傳輸機制。在正常的數(shù)據(jù)傳輸過程中,數(shù)據(jù)按照先后順序被傳送,但帶外數(shù)據(jù)允許在正常數(shù)據(jù)之外傳送一些緊急的、優(yōu)先級較高的信息。
TCP的帶外數(shù)據(jù)機制提供了一種通知機制,使得通信的一方可以向對方發(fā)送一些額外的信息,而不會影響正常的數(shù)據(jù)傳輸順序。帶外數(shù)據(jù)通常用于緊急通知或優(yōu)先級較高的控制信息。
在使用TCP時,帶外數(shù)據(jù)的發(fā)送和接收通常通過 send
和 recv
函數(shù)的 MSG_OOB
標志來實現(xiàn)。發(fā)送端通過 send
函數(shù)發(fā)送帶外數(shù)據(jù),接收端通過 recv
函數(shù)的 MSG_OOB
標志來接收帶外數(shù)據(jù)。
在上述的代碼示例中,通過設置套接字選項 SO_OOBINLINE
,將帶外數(shù)據(jù)與普通數(shù)據(jù)一起接收,然后通過 recv
函數(shù)的 MSG_OOB
標志處理帶外數(shù)據(jù)。
9.2 poll系統(tǒng)調用
poll系統(tǒng)調用和select類似,也是在指定時間內輪詢一定數(shù)量的文件描述符,以測試其中是否有就緒者。poll的原型如下:
#include <pol1.h>
int poll( struct pollfd* fds, nfds_t nfds, int timeout );
1)fds參數(shù)是一個pollfd結構類型的數(shù)組,它指定所有我們感興趣的文件描述符上發(fā)生的可讀、可寫和異常等事件。pollfd結構體的定義如下:
struct pollfd
{
int fd; /*文件描述符*/
short events; /*注冊的事件*/
short revents; /*實際發(fā)生的事件,由內核填充*/
};
其中,fd成員指定文件描述符;events成員告訴poll監(jiān)聽fd上的哪些事件,它是一系列事件的按位或:revents成員則由內核修改,以通知應用程序fd上實際發(fā)生了哪些事件。poll 支持的事件類型如表9-1所示。
表9-1中,POLLRDNORM、POLLRDBAND、POLLWRNORM、POLLWRBAND由XOPEN規(guī)范定義。它們實際上是將POLLIN事件和POLLOUT事件分得更細致,以區(qū)別對待普通數(shù)據(jù)和優(yōu)先數(shù)據(jù)。但Linux并不完全支持它們。
通常,應用程序需要根據(jù)recv調用的返回值來區(qū)分socket上接收到的是有效數(shù)據(jù)還是對方關閉連接的請求,并做相應的處理。不過,自Linux內核2.6.17開始,GNU為poll系統(tǒng)調 用增加了一個POLLRDHUP事件,它在socket上接收到對方關閉連接的請求之后觸發(fā)。這為我們區(qū)分上述兩種情況提供了一種更簡單的方式。但使用POLLRDHUP事件時,我們需要在代碼最開始處定義_GNU SOURCE。
2)nfds參數(shù)指定被監(jiān)聽事件集合fds的大小。其類型nfds t的定義如下: typedef unsigned long int nfds_t;
3)timeout參數(shù)指定poll的超時值,單位是毫秒。當timeout為-l時,poll調用將永遠阻塞,直到某個事件發(fā)生;當timeout為0時,poll調用將立即返回。poll系統(tǒng)調用的返回值的含義與select相同。
poll
函數(shù)用于監(jiān)視多個文件描述符,以查看它們是否準備好進行 I/O 操作。它提供了對 select
的更靈活替代,并可以高效處理大量文件描述符。
以下是正確的頭文件和使用 poll
的示例:
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
int main() {
struct pollfd fds[1];
int timeout = 5000; // 超時時間,單位為毫秒
// 初始化要監(jiān)視的文件描述符
fds[0].fd = /* 您的文件描述符 */;
fds[0].events = POLLIN; // 監(jiān)視可讀性
// 調用 poll
int result = poll(fds, 1, timeout);
if (result == -1) {
perror("poll");
exit(EXIT_FAILURE);
} else if (result == 0) {
printf("發(fā)生超時。\n");
} else {
if (fds[0].revents & POLLIN) {
printf("數(shù)據(jù)準備好讀取。\n");
// 處理讀取操作中的數(shù)據(jù)
}
// 如果需要,可以檢查其他事件
}
return 0;
}
在這個例子中,poll
用于監(jiān)視具有指定超時的單個文件描述符的可讀性。代碼檢查文件描述符是否準備好進行讀取,并可以根據(jù)需要擴展以處理其他事件。
9.3 epoll系列系統(tǒng)調用
9.3.1內核事件表
epoll是Linux特有的I/O復用函數(shù)。它在實現(xiàn)和使用上與select、poll有很大差異。首 先,epoll使用一組函數(shù)來完成任務,而不是單個函數(shù)。其次,epoll把用戶關心的文件描述 符上的事件放在內核里的一個事件表中,從而無須像select和poll那樣每次調用都要重復傳 入文件描述符集或事件集。但epoll需要使用一個額外的文件描述符,來唯一標識內核中的 這個事件表。這個文件描述符使用如下epoll_create函數(shù)來創(chuàng)建:
#include <sys/epoll.h>
int epol1_create( int size )
size參數(shù)現(xiàn)在并不起作用,只是給內核一個提示,告訴它事件表需要多大。該函數(shù)返回 的文件描述符將用作其他所有epoll系統(tǒng)調用的第一個參數(shù),以指定要訪問的內核事件表。
下面的函數(shù)用來操作epoll的內核事件表:
#include <sys/epoll.h>
int epoll_ctl( int epfd, int op, int fd, struct epoll_event *event )
fd參數(shù)是要操作的文件描述符,op參數(shù)則指定操作類型。操作類型有如下3種:
EPOLL_CTL_ADD,往事件表中注冊fd上的事件。
EPOLL_CTL_MOD,修改fd上的注冊事件。
EPOLL_CTL_DEL,刪除fd上的注冊事件。
event參數(shù)指定事件,它是epoll_event結構指針類型。
epoll_event 的定義如下:
struct epoll_event
{
__uint32_t events; /* epoll事件*/
epoll_data_t data; /*用戶數(shù)據(jù) */
};
其中events 成員描述事件類型。epoll支持的事件類型和poll基本相同。表示epoll事件 類型的宏是在poll對應的宏前加上“E”,比如epoll的數(shù)據(jù)可讀事件是EPOLLIN。但epoll有 兩個額外的事件類型—EPOLLET和EPOLLONESHOT。它們對于epoll的高效運作非常關 鍵,我們將在后面討論它們。data成員用于存儲用戶數(shù)據(jù),
其類型epoll_data_t的定義如下:
typedef union epoll_data
{
void* ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
epoll_data_t是一個聯(lián)合體,其4個成員中使用最多的是fd,它指定事件所從屬的目標 文件描述符。ptr成員可用來指定與fd相關的用戶數(shù)據(jù)。但由于epoll_data_t是一個聯(lián)合體, 我們不能同時使用其ptr成員和fd成員,因此,如果要將文件描述符和用戶數(shù)據(jù)關聯(lián)起來 (正如8.5.2小節(jié)討論的將句柄和事件處理器綁定一樣),以實現(xiàn)快速的數(shù)據(jù)訪問,只能使用其他手段,比如放棄使用epoll_data_t的fd成員,而在ptr指向的用戶數(shù)據(jù)中包含fd。
epoll_ctl成功時返回0,失敗則返回-1并設置errmo。
9.3.2 epoll_wait 函數(shù)
epoll系列系統(tǒng)調用的主要接口是epoll wait 函數(shù)。它在一段超時時間內等待一組文件描 述符上的事件,其原型如下:
#include <sys/epoll.h>
int epoll_wait( int epfd, struct epoll_event* events,
int maxevents,int timeout );
該函數(shù)成功時返回就緒的文件描述符的個數(shù),失敗時返回-1并設置errno。關于該函數(shù)的參數(shù),我們從后往前討論。timeout參數(shù)的含義與poll接口的timeout參數(shù) 相同。maxevents參數(shù)指定最多監(jiān)聽多少個事件,它必須大于0。
epoll_wait函數(shù)如果檢測到事件,就將所有就緒的事件從內核事件表(由epfd參數(shù)指 定)中復制到它的第二個參數(shù)events指向的數(shù)組中。這個數(shù)組只用于輸出 epoll_wait 檢測到 的就緒事件,而不像select和poll的數(shù)組參數(shù)那樣既用于傳入用戶注冊的事件,又用于輸出 內核檢測到的就緒事件。這就極大地提高了應用程序索引就緒文件描述符的效率。代碼清單 9-2體現(xiàn)了這個差別。
poll
和 epoll
都是用于實現(xiàn) I/O 多路復用的機制,但它們在使用上有一些差異。主要的區(qū)別包括性能和觸發(fā)方式。
1. 性能差異:
-
poll
在文件描述符較多的情況下性能較差,因為它是線性掃描的,時間復雜度為O(n)。 -
epoll
使用事件通知的方式,對于大量的文件描述符能夠更高效地處理,時間復雜度為O(1)。
2. 觸發(fā)方式:
-
poll
是水平觸發(fā)(Level Triggered)的,當文件描述符中的數(shù)據(jù)準備好時,每次調用poll
都會通知。 -
epoll
支持水平觸發(fā)和邊緣觸發(fā)(Edge Triggered),可以選擇性地只在狀態(tài)變化時通知。
以下是簡單的使用示例:
使用 poll 的示例:
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
int main() {
struct pollfd fds[1];
int timeout = 5000; // 超時時間,單位為毫秒
// 初始化要監(jiān)視的文件描述符
fds[0].fd = /* 您的文件描述符 */;
fds[0].events = POLLIN; // 監(jiān)視可讀性
// 調用 poll
int result = poll(fds, 1, timeout);
if (result == -1) {
perror("poll");
exit(EXIT_FAILURE);
} else if (result == 0) {
printf("發(fā)生超時。\n");
} else {
if (fds[0].revents & POLLIN) {
printf("數(shù)據(jù)準備好讀取。\n");
// 處理讀取操作中的數(shù)據(jù)
}
// 如果需要,可以檢查其他事件
}
return 0;
}
使用 epoll 的示例:
#include <sys/epoll.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
int main() {
int epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = /* 您的文件描述符 */;
// 將文件描述符添加到 epoll 監(jiān)聽
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, /* 您的文件描述符 */, &event) == -1) {
perror("epoll_ctl");
close(epoll_fd);
exit(EXIT_FAILURE);
}
struct epoll_event events[1];
int timeout = 5000; // 超時時間,單位為毫秒
// 調用 epoll_wait
int num_events = epoll_wait(epoll_fd, events, 1, timeout);
if (num_events == -1) {
perror("epoll_wait");
close(epoll_fd);
exit(EXIT_FAILURE);
} else if (num_events == 0) {
printf("發(fā)生超時。\n");
} else {
if (events[0].events & EPOLLIN) {
printf("數(shù)據(jù)準備好讀取。\n");
// 處理讀取操作中的數(shù)據(jù)
}
// 如果需要,可以檢查其他事件
}
close(epoll_fd);
return 0;
}
上述示例僅演示基本用法,實際使用中需要根據(jù)具體場景和需求進行適當?shù)男薷摹?/p>
9.3.3LT和ET模式
epoll對文件描述符的操作有兩種模式:LT(Level Trigger,電平觸發(fā))模式和ET(Edge Trigger,邊沿觸發(fā))模式。LT模式是默認的工作模式,這種模式下epoll相當于一個效率較 高的poll。當往epoll內核事件表中注冊一個文件描述符上的EPOLLET事件時,epoll將以 ET模式來操作該文件描述符。ET模式是epoll的高效工作模式。
對于采用LT工作模式的文件描述符,當epoll_wait檢測到其上有事件發(fā)生并將此 事件通知應用程序后,應用程序可以不立即處理該事件。這樣,當應用程序下一次調用 epoll_wait時,poll_wait還會再次向應用程序通告此事件,直到該事件被處理。而對于 采用ET工作模式的文件描述符,當epoll_wait檢測到其上有事件發(fā)生并將此事件通知應 用程序后,應用程序必須立即處理該事件,因為后續(xù)的epoll_wait 調用將不再向應用程序 通知這一事件。可見,ET模式在很大程度上降低了同一個epoll事件被重復觸發(fā)的次數(shù),因此效率要比LT模式高。代碼清單9-3體現(xiàn)了LT和ET在工作方式上的差異。
9-3mtlt.cpp
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10
// 設置文件描述符為非阻塞模式
int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
// 添加文件描述符到 epoll 監(jiān)聽
void addfd(int epollfd, int fd, bool enable_et) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
if (enable_et) {
event.events |= EPOLLET; // 邊緣觸發(fā)模式
}
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
// 使用水平觸發(fā)模式處理事件
void lt(epoll_event* events, int number, int epollfd, int listenfd) {
char buf[BUFFER_SIZE];
for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
// 處理新的連接請求
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd, false); // 使用水平觸發(fā)模式
} else if (events[i].events & EPOLLIN) {
// 有數(shù)據(jù)可讀
printf("event trigger once\n");
memset(buf, '\0', BUFFER_SIZE);
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
if (ret <= 0) {
close(sockfd);
continue;
}
printf("get %d bytes of content: %s\n", ret, buf);
} else {
printf("something else happened \n");
}
}
}
// 使用邊緣觸發(fā)模式處理事件
void et(epoll_event* events, int number, int epollfd, int listenfd) {
char buf[BUFFER_SIZE];
for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
// 處理新的連接請求
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd, true); // 使用邊緣觸發(fā)模式
} else if (events[i].events & EPOLLIN) {
// 有數(shù)據(jù)可讀
printf("event trigger once\n");
while (1) {
memset(buf, '\0', BUFFER_SIZE);
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
if (ret < 0) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
printf("read later\n");
break;
}
close(sockfd);
break;
} else if (ret == 0) {
close(sockfd);
} else {
printf("get %d bytes of content: %s\n", ret, buf);
}
}
} else {
printf("something else happened \n");
}
}
}
int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );
int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );
int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );
ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );
ret = listen( listenfd, 5 );
assert( ret != -1 );
epoll_event events[ MAX_EVENT_NUMBER ];
int epollfd = epoll_create( 5 );
assert( epollfd != -1 );
addfd( epollfd, listenfd, true );
while( 1 )
{
int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ret < 0 )
{
printf( "epoll failure\n" );
break;
}
lt( events, ret, epollfd, listenfd );
//et( events, ret, epollfd, listenfd );
}
close( listenfd );
return 0;
}
9.3.4 EPOLLONESHOT事件
即使我們使用ET模式,一個socket上的某個事件還是可能被觸發(fā)多次。這在并發(fā)程序 中就會引起一個問題。比如一個線程(或進程,下同)在讀取完某個socket上的數(shù)據(jù)后開 始處理這些數(shù)據(jù),而在數(shù)據(jù)的處理過程中該socket上又有新數(shù)據(jù)可讀(EPOLLIN再次被觸 發(fā)),此時另外一個線程被喚醒來讀取這些新的數(shù)據(jù)。于是就出現(xiàn)了兩個線程同時操作一個 socket的局面。這當然不是我們期望的。我們期望的是一個socket連接在任一時刻都只被一個線程處理。這一點可以使用epoll的EPOLLONESHOT事件實現(xiàn)。
對于注冊了EPOLLONESHOT事件的文件描述符,操作系統(tǒng)最多觸發(fā)其上注冊的一個可讀、可寫或者異常事件,且只觸發(fā)一次,除非我們使用epoll_ctl函數(shù)重置該文件描述符上注 冊的EPOLLONESHOT事件。這樣,當一個線程在處理某個socket時,其他線程是不可能有 機會操作該socket的。但反過來思考,注冊了EPOLLONESHOT事件的socket一旦被某個 線程處理完畢,該線程就應該立即重置這個socket上的EPOLLONESHOT事件,以確保這個socket下一次可讀時,其EPOLLIN事件能被觸發(fā),進而讓其他工作線程有機會繼續(xù)處理這 個 socket。
代碼清單9-4展示了EPOLLONESHOT事件的使用。
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024
struct fds {
int epollfd;
int sockfd;
};
// 設置文件描述符為非阻塞模式
int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
// 添加文件描述符到 epoll 監(jiān)聽
void addfd(int epollfd, int fd, bool oneshot) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET; // 默認使用邊緣觸發(fā)模式
if (oneshot) {
event.events |= EPOLLONESHOT; // 使用 EPOLLONESHOT 保證一個 socket 連接在任意時刻只被一個線程處理
}
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
// 重置文件描述符上的 EPOLLONESHOT 事件
void reset_oneshot(int epollfd, int fd) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}
// 工作線程,負責處理某個 socket 上的數(shù)據(jù)
void* worker(void* arg) {
// 獲取傳遞給線程的參數(shù),包含要處理的 socket 文件描述符和 epoll 文件描述符
int sockfd = ((fds*)arg)->sockfd;
int epollfd = ((fds*)arg)->epollfd;
printf("start new thread to receive data on fd: %d\n", sockfd);
char buf[BUFFER_SIZE];
memset(buf, '\0', BUFFER_SIZE);
// 循環(huán)讀取數(shù)據(jù)
while (1) {
// 使用 recv 從 sockfd 接收數(shù)據(jù)
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
if (ret == 0) {
// 連接關閉,關閉 sockfd 并打印信息
close(sockfd);
printf("foreigner closed the connection\n");
break;
} else if (ret < 0) {
// 非阻塞模式下,如果沒有數(shù)據(jù)可讀,errno 為 EAGAIN,重新設置 EPOLLONESHOT 事件并退出循環(huán)
if (errno == EAGAIN) {
reset_oneshot(epollfd, sockfd);
printf("read later\n");
break;
}
} else {
// 成功接收到數(shù)據(jù),打印數(shù)據(jù)內容并休眠 5 秒
printf("get content: %s\n", buf);
sleep(5);
}
}
printf("end thread receiving data on fd: %d\n", sockfd);
}
int main(int argc, char* argv[]) {
if (argc <= 2) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
// 創(chuàng)建監(jiān)聽 socket
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
// 綁定地址和端口,監(jiān)聽連接
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd != -1);
// 添加監(jiān)聽 socket 到 epoll 中
addfd(epollfd, listenfd, false);
// 循環(huán)等待事件
while (1) {
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if (ret < 0) {
printf("epoll failure\n");
break;
}
// 處理每個事件
for (int i = 0; i < ret; i++) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
// 處理新的連接請求,添加新的連接到 epoll 中
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd, true);
} else if (events[i].events & EPOLLIN) {
// 有數(shù)據(jù)可讀,創(chuàng)建一個工作線程來處理
pthread_t thread;
fds fds_for_new_worker;
fds_for_new_worker.epollfd = epollfd;
fds_for_new_worker.sockfd = sockfd;
pthread_create(&thread, NULL, worker, (void*)&fds_for_new_worker);
} else {
printf("something else happened \n");
}
}
}
// 關閉監(jiān)聽 socket
close(listenfd);
return 0;
}
從工作線程函數(shù)worker來看,如果一個工作線程處理完某個socket上的一次請求(我們用休眠5s來模擬這個過程)之后,又接收到該socket上新的客戶請求,則該線程將繼續(xù) 為這個socket服務。并且因為該socket上注冊了EPOLLONESHOT事件,其他線程沒有機會接觸這個socket,如果工作線程等待5s后仍然沒收到該socket 上的下一批客戶數(shù)據(jù),則 它將放棄為該socket服務。同時,它調用reset_oneshot 函數(shù)來重置該socket上的注冊事件, 這將使epoll有機會再次檢測到該socket 上的EPOLLIN事件,進而使得其他線程有機會為該socket服務。
由此看來,盡管一個socket在不同時間可能被不同的線程處理,但同一時刻肯定只有一個線程在為它服務。這就保證了連接的完整性,從而避免了很多可能的競態(tài)條件。
9.4 三組I/O復用函數(shù)的比較
前面我們討論了select、poll和epoll三組I/O復用系統(tǒng)調用,這3組系統(tǒng)調用都能同時監(jiān)聽多個文件描述符。它們將等I待由timeout參數(shù)指定的超時時間,直到一個或者多個文件描述符上有事件發(fā)生時返回,返回值是就緒的文件描述符的數(shù)量。返回0表示沒有事件發(fā)生。現(xiàn)在我們從事件集、最大支持文件描述符數(shù)、工作模式和具體實現(xiàn)等四個方面進一步比較它們的異同,以明確在實際應用中應該選擇使用哪個(或哪些)。
這3組函數(shù)都通過某種結構體變量來告訴內核監(jiān)聽哪些文件描述符上的哪些事件,并使用該結構體類型的參數(shù)來獲取內核處理的結果。
select的參數(shù)類型fd_set沒有將文件描述符和事件綁定,它僅僅是一個文件描述符集合,因此select需要提供3個這種類型的參數(shù)來分別傳入和輸出可讀、可寫及異常等事件。這一方面使得select不能處理更多類型的事件,另 一方面由于內核對fd_set集合的在線修改,應用程序下次調用select 前不得不重置這3個fd set集合。
poll的參數(shù)類型pollfd則多少“聰明”一些。它把文件描述符和事件都定義其中, 任何事件都被統(tǒng)一處理,從而使得編程接口簡潔得多。并且內核每次修改的是pollfd結構體 的revents成員,而events成員保持不變,因此下次調用poll 時應用程序無須重置 pollfd類型的事件集參數(shù)。由于每次select和poll調用都返回整個用戶注冊的事件集合(其中包括就 緒的和未就緒的),所以應用程序索引就緒文件描述符的時間復雜度為O(n)。
epoll則采用 與select和poll完全不同的方式來管理用戶注冊的事件。它在內核中維護一個事件表,并提供了一個獨立的系統(tǒng)調用epoll_ctl來控制往其中添加、刪除、修改事件。這樣,每次epoll wait調用都直接從該內核事件表中取得用戶注冊的事件,而無須反復從用戶空間讀入這些事 件。epoll_wait系統(tǒng)調用的events參數(shù)僅用來返回就緒的事件,這使得應用程序索引就緒文 件描述符的時間復雜度達到O(1)。
poll和epoll_wait 分別用nfds和maxevents參數(shù)指定最多監(jiān)聽多少個文件描述符和事件。 這兩個數(shù)值都能達到系統(tǒng)允許打開的最大文件描述符數(shù)目,即65535(cat/proc/sys/fs/file- max)。而select允許監(jiān)聽的最大文件描述符數(shù)量通常有限制。雖然用戶可以修改這個限制, 但這可能導致不可預期的后果。
select和poll都只能工作在相對低效的LT模式,而epoll則可以工作在ET高效模式。 并且epoll還支持EPOLLONESHOT事件。該事件能進一步減少可讀、可寫和異常等事件被 觸發(fā)的次數(shù)。
從實現(xiàn)原理上來說,select和poll采用的都是輪詢的方式,即每次調用都要掃描整個注冊文件描述符集合,并將其中就緒的文件描述符返回給用戶程序,因此它們檢測就緒事件的算法的時間復雜度是O(n)。epoll_wait則不同,它采用的是回調的方式。內核檢測到就緒 的文件描述符時,將觸發(fā)回調函數(shù),回調函數(shù)就將該文件描述符上對應的事件插入內核就緒 事件隊列。內核最后在適當?shù)臅r機將該就緒事件隊列中的內容拷貝到用戶空間。因此epoll wait無須輪詢整個文件描述符集合來檢測哪些事件已經(jīng)就緒,其算法時間復雜度是O(1)。 但是,當活動連接比較多的時候,epoll_wait 的效率未必比 select和poll高,因為此時回調函 數(shù)被觸發(fā)得過于頻繁。所以epoll_wait 適用于連接數(shù)量多,但活動連接較少的情況。
最后,為了便于閱讀,我們將這3組I/O復用系統(tǒng)調用的區(qū)別總結于表9-2中。
9.5 I/O復用的高級應用一:非阻塞connect
connect出錯時有一種errno值是EINPROGRESS。這種錯誤發(fā)生在對非阻塞的socket調用connect,而連接又沒有立即建立時。在這種情況下, 我們可以調用select、poll等函數(shù)來監(jiān)聽這個連接失敗的socket上的可寫事件。當select、 poll等函數(shù)返回后,再利用getsockopt來讀取錯誤碼并清除該socket上的錯誤。如果錯誤碼是0,表示連接成功建立,否則連接失敗。通過上面描述的非阻塞connect方式,我們就能同時發(fā)起多個連接并一起等待。下面看看非阻塞connect的一種實現(xiàn),如代碼清單9-5所示。
9-5unblockconnect.cpp
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <time.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <string.h>
#define BUFFER_SIZE 1023
// 設置套接字為非阻塞模式
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
// 非阻塞 connect 函數(shù)
int unblock_connect(const char* ip, int port, int time)
{
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int sockfd = socket(PF_INET, SOCK_STREAM, 0);
int fdopt = setnonblocking(sockfd);
// 嘗試發(fā)起連接
ret = connect(sockfd, (struct sockaddr*)&address, sizeof(address));
if (ret == 0)
{
// 連接成功
printf("Connect with server immediately\n");
fcntl(sockfd, F_SETFL, fdopt);
return sockfd;
}
else if (errno != EINPROGRESS)
{
// 非阻塞連接不被支持
printf("Unblock connect not supported\n");
return -1;
}
fd_set readfds;
fd_set writefds;
struct timeval timeout;
FD_ZERO(&readfds);
FD_SET(sockfd, &writefds);
timeout.tv_sec = time;
timeout.tv_usec = 0;
// 使用 select 等待連接建立
ret = select(sockfd + 1, NULL, &writefds, NULL, &timeout);
if (ret <= 0)
{
// 連接超時
printf("Connection time out\n");
close(sockfd);
return -1;
}
if (!FD_ISSET(sockfd, &writefds))
{
// 未在 sockfd 上發(fā)現(xiàn)事件
printf("No events on sockfd found\n");
close(sockfd);
return -1;
}
int error = 0;
socklen_t length = sizeof(error);
// 檢查連接是否成功
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &length) < 0)
{
printf("Get socket option failed\n");
close(sockfd);
return -1;
}
if (error != 0)
{
// 連接失敗
printf("Connection failed after select with the error: %d \n", error);
close(sockfd);
return -1;
}
// 連接準備就緒
printf("Connection ready after select with the socket: %d \n", sockfd);
fcntl(sockfd, F_SETFL, fdopt);
return sockfd;
}
int main(int argc, char* argv[])
{
if (argc <= 2)
{
printf("Usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
int sockfd = unblock_connect(ip, port, 10);
if (sockfd < 0)
{
return 1;
}
// 關閉寫端,模擬發(fā)送數(shù)據(jù)
shutdown(sockfd, SHUT_WR);
sleep(200);
printf("Send data out\n");
send(sockfd, "abc", 3, 0);
//sleep(600);
return 0;
}
注釋:
-
setnonblocking
: 設置套接字為非阻塞模式的函數(shù),返回舊的套接字選項。 -
unblock_connect
: 實現(xiàn)非阻塞 connect 的函數(shù),嘗試發(fā)起連接,如果連接成功,返回連接的套接字;如果連接正在進行中,使用select
等待連接建立。 -
main
: 主函數(shù),接收命令行參數(shù),調用unblock_connect
嘗試進行非阻塞連接,之后關閉寫端并模擬發(fā)送數(shù)據(jù)。
但遺憾的是,這種方法存在幾處移植性問題。首先,非阻塞的socket 可能導致connect 始終失敗。其次,select對處于EINPROGRESS狀態(tài)下的socket可能不起作用。最后,對于 出錯的socket,getsockopt在有些系統(tǒng)(比如Linux)上返回-1, 而在有些系統(tǒng)(比如源自伯克利的UNIX)上則返回0。這些問題沒有一個統(tǒng)一的解決方法。
9.6 I/O復用的高級應用二:聊天室程序
像ssh這樣的登錄服務通常要同時處理網(wǎng)絡連接和用戶輸入,這也可以使用I/O復用來實現(xiàn)。
本節(jié)我們以poll為例實現(xiàn)一個簡單的聊天室程序,以闡述如何使用I/O復用技術來同時處理網(wǎng)絡連接和用戶輸入。該聊天室程序能讓所有用戶同時在線群聊,它分為客戶端和服務器兩個部分。其中客戶端程序有兩個功能:一是從標準輸入終端讀入用戶數(shù)據(jù),并將用戶數(shù)據(jù)發(fā)送至服務器;二是往標準輸出終端打印服務器發(fā)送給它的數(shù)據(jù)。服務器的功能是接收 客戶數(shù)據(jù),并把客戶數(shù)據(jù)發(fā)送給每一個登錄到該服務器上的客戶端(數(shù)據(jù)發(fā)送者除外)。下 面我們依次給出客戶端程序和服務器程序的代碼。
9.6.1客戶端
客戶端程序使用poll同時監(jiān)聽用戶輸入和網(wǎng)絡連接,并利用splice函數(shù)將用戶輸入內容 直接定向到網(wǎng)絡連接上以發(fā)送之,從而實現(xiàn)數(shù)據(jù)零拷貝,提高了程序執(zhí)行效率??蛻舳顺绦虼a如下。
9-6mytalk_client.cpp
#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <poll.h>
#include <fcntl.h>
#define BUFFER_SIZE 64
int main(int argc, char* argv[])
{
if (argc <= 2)
{
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
// 創(chuàng)建服務器地址結構體
struct sockaddr_in server_address;
bzero(&server_address, sizeof(server_address));
server_address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &server_address.sin_addr);
server_address.sin_port = htons(port);
// 創(chuàng)建客戶端套接字
int sockfd = socket(PF_INET, SOCK_STREAM, 0);
assert(sockfd >= 0);
// 嘗試連接服務器
if (connect(sockfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0)
{
printf("Connection failed\n");
close(sockfd);
return 1;
}
// 創(chuàng)建 pollfd 數(shù)組,用于監(jiān)視標準輸入和 sockfd 上的事件
pollfd fds[2];
fds[0].fd = 0; // 標準輸入
fds[0].events = POLLIN; // 監(jiān)視讀事件
fds[0].revents = 0;
fds[1].fd = sockfd; // 客戶端套接字
fds[1].events = POLLIN | POLLRDHUP; // 監(jiān)視讀事件和對端連接斷開事件
fds[1].revents = 0;
char read_buf[BUFFER_SIZE];
int pipefd[2];
// 創(chuàng)建管道
int ret = pipe(pipefd);
assert(ret != -1);
while (1)
{
// 使用 poll 等待事件發(fā)生
ret = poll(fds, 2, -1);
if (ret < 0)
{
printf("poll failure\n");
break;
}
// 處理服務器斷開連接事件
if (fds[1].revents & POLLRDHUP)
{
printf("Server close the connection\n");
break;
}
// 處理服務器發(fā)送的數(shù)據(jù)事件
else if (fds[1].revents & POLLIN)
{
memset(read_buf, '\0', BUFFER_SIZE);
recv(fds[1].fd, read_buf, BUFFER_SIZE - 1, 0);
printf("%s\n", read_buf);
}
// 處理標準輸入事件
if (fds[0].revents & POLLIN)
{
// 從標準輸入讀取數(shù)據(jù),并通過管道發(fā)送給服務器
ret = splice(0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
ret = splice(pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
}
}
// 關閉客戶端套接字
close(sockfd);
return 0;
}
注釋:
-
server_address
: 服務器地址結構體,存儲要連接的服務器的 IP 地址和端口號。 -
sockfd
: 客戶端套接字,用于與服務器建立連接和進行通信。 -
fds[2]
:pollfd
數(shù)組,用于監(jiān)視標準輸入和客戶端套接字上的事件。 -
pipefd[2]
: 管道,用于在標準輸入和客戶端套接字之間傳輸數(shù)據(jù)。 -
poll
: 使用poll
函數(shù)等待事件發(fā)生,處理標準輸入、客戶端套接字的讀事件和對端連接斷開事件。 -
splice
: 使用splice
函數(shù)將標準輸入的數(shù)據(jù)發(fā)送給服務器,并將服務器發(fā)送的數(shù)據(jù)寫入標準輸出。
9.6.2服務器
服務器程序使用poll同時管理監(jiān)聽 socket和連接socket,并且使用犧牲空間換取時間的 策略來提高服務器性能,如代碼清單9-7所示。
9-7mytalk_server.cpp
#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <poll.h>
#define USER_LIMIT 5
#define BUFFER_SIZE 64
#define FD_LIMIT 65535
struct client_data
{
sockaddr_in address; // 存儲客戶端地址信息
char* write_buf; // 寫緩沖區(qū)
char buf[BUFFER_SIZE]; // 讀緩沖區(qū)
};
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
int main(int argc, char* argv[])
{
if (argc <= 2)
{
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
// 初始化客戶端數(shù)據(jù)數(shù)組和 pollfd 數(shù)組
client_data* users = new client_data[FD_LIMIT];
pollfd fds[USER_LIMIT + 1];
int user_counter = 0;
// 初始化 pollfd 數(shù)組,將監(jiān)聽套接字加入數(shù)組
for (int i = 1; i <= USER_LIMIT; ++i)
{
fds[i].fd = -1;
fds[i].events = 0;
}
fds[0].fd = listenfd;
fds[0].events = POLLIN | POLLERR;
fds[0].revents = 0;
while (1)
{
// 使用 poll 等待事件發(fā)生
ret = poll(fds, user_counter + 1, -1);
if (ret < 0)
{
printf("poll failure\n");
break;
}
// 處理監(jiān)聽套接字上的事件
if ((fds[0].fd == listenfd) && (fds[0].revents & POLLIN))
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
if (connfd < 0)
{
printf("errno is: %d\n", errno);
continue;
}
// 處理連接數(shù)超過限制的情況
if (user_counter >= USER_LIMIT)
{
const char* info = "too many users\n";
printf("%s", info);
send(connfd, info, strlen(info), 0);
close(connfd);
continue;
}
// 客戶端連接數(shù)加1,設置連接套接字為非阻塞
user_counter++;
users[connfd].address = client_address;
setnonblocking(connfd);
fds[user_counter].fd = connfd;
fds[user_counter].events = POLLIN | POLLRDHUP | POLLERR;
fds[user_counter].revents = 0;
printf("comes a new user, now have %d users\n", user_counter);
}
else
{
// 處理其他連接上的事件
for (int i = 1; i <= user_counter; ++i)
{
if (fds[i].revents & POLLERR)
{
printf("get an error from %d\n", fds[i].fd);
char errors[100];
memset(errors, '\0', 100);
socklen_t length = sizeof(errors);
// 獲取套接字錯誤信息
if (getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length) < 0)
{
printf("get socket option failed\n");
}
continue;
}
else if (fds[i].revents & POLLRDHUP)
{
// 客戶端斷開連接,處理相關信息
users[fds[i].fd] = users[fds[user_counter].fd];
close(fds[i].fd);
fds[i] = fds[user_counter];
i--;
user_counter--;
printf("a client left\n");
}
else if (fds[i].revents & POLLIN)
{
// 讀取客戶端數(shù)據(jù)
int connfd = fds[i].fd;
memset(users[connfd].buf, '\0', BUFFER_SIZE);
ret = recv(connfd, users[connfd].buf, BUFFER_SIZE - 1, 0);
printf("get %d bytes of client data %s from %d\n", ret, users[connfd].buf, connfd);
if (ret < 0)
{
if (errno != EAGAIN)
{
close(connfd);
users[fds[i].fd] = users[fds[user_counter].fd];
fds[i] = fds[user_counter];
i--;
user_counter--;
}
}
else if (ret == 0)
{
printf("code should not come to here\n");
}
else
{
// 將客戶端數(shù)據(jù)發(fā)送給其他連接的客戶端
for (int j = 1; j <= user_counter; ++j)
{
if (fds[j].fd == connfd)
{
continue;
}
fds[j].events |= ~POLLIN;
fds[j].events |= POLLOUT;
users[fds[j].fd].write_buf = users[connfd].buf;
}
}
}
else if (fds[i].revents & POLLOUT)
{
// 發(fā)送數(shù)據(jù)給客戶端
int connfd = fds[i].fd;
if (!users[connfd].write_buf)
{
continue;
}
ret = send(connfd, users[connfd].write_buf, strlen(users[connfd].write_buf), 0);
users[connfd].write_buf = NULL;
fds[i].events |= ~POLLOUT;
fds[i].events |= POLLIN;
}
}
}
}
delete[] users;
close(listenfd);
return 0;
}
工作流程:
- 創(chuàng)建監(jiān)聽套接字
listenfd
,綁定地址并開始監(jiān)聽。 - 初始化客戶端數(shù)據(jù)結構體數(shù)組
users
和pollfd
數(shù)組fds
。 - 在
fds
數(shù)組中將監(jiān)聽套接字listenfd
加入,設置監(jiān)聽事件為POLLIN
和POLLERR
。 - 進入主循環(huán),使用
poll
等待事件發(fā)生。 - 處理監(jiān)聽套接字上的事件,如果有新的客戶端連接,則接受連接,設置為非阻塞,并加入
fds
數(shù)組中。 - 處理其他連接上的事件,包括錯誤、對端連接斷開、有數(shù)據(jù)可讀、有數(shù)據(jù)可寫。
- 對于數(shù)據(jù)可讀的連接,將數(shù)據(jù)讀取并發(fā)送給其他連接的客戶端。
- 對于數(shù)據(jù)可寫的連接,將數(shù)據(jù)發(fā)送給客戶端。
- 循環(huán)處理事件,直到發(fā)生錯誤或中斷。
該程序使用 poll
實現(xiàn)了一個簡單的多客戶端聊天室服務器,通過非阻塞套接字和事件驅動的方式處理多個連接,支持同時連接多個客戶端。
9.7 I/O復用的高級應用三:同時處理TCP和UDP服務
至此,我們討論過的服務器程序都只監(jiān)聽一個端口。在實際應用中,有不少服務器程序能同時監(jiān)聽多個端口,比如超級服務inetd和android的調試服務adbd。從bind系統(tǒng)調用的參數(shù)來看,一個socket只能與一個socket地址綁定,即一個socket 只能用來監(jiān)聽一個端口。因此,服務器如果要同時監(jiān)聽多個端口,就必須創(chuàng)建多個socket, 并將它們分別綁定到各個端口上。這樣一來,服務器程序就需要同時管理多個監(jiān)聽socket, I/O復用技術就有了用武之地。另外,即使是同一個端口,如果服務器要同時處理該端口上 的TCP和UDP請求,則也需要創(chuàng)建兩個不同的socket:一個是流socket,另一個是數(shù)據(jù)報socket,并將它們都綁定到該端口上。
比如代碼清單9-8所示的回射服務器就能同時處理一 個端口上的TCP和UDP請求。
9-8multi_port.cpp
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#define MAX_EVENT_NUMBER 1024
#define TCP_BUFFER_SIZE 512
#define UDP_BUFFER_SIZE 1024
// 設置套接字為非阻塞
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
// 將文件描述符加入 epoll 事件監(jiān)聽集合
void addfd(int epollfd, int fd)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
int main(int argc, char* argv[])
{
if (argc <= 2)
{
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
int ret = 0;
struct sockaddr_in address;
// 初始化 TCP 套接字
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
// 初始化 UDP 套接字
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int udpfd = socket(PF_INET, SOCK_DGRAM, 0);
assert(udpfd >= 0);
ret = bind(udpfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);
epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd != -1);
// 將 TCP 和 UDP 套接字加入 epoll 事件監(jiān)聽集合
addfd(epollfd, listenfd);
addfd(epollfd, udpfd);
while (1)
{
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if (number < 0)
{
printf("epoll failure\n");
break;
}
for (int i = 0; i < number; i++)
{
int sockfd = events[i].data.fd;
// 處理新的 TCP 連接
if (sockfd == listenfd)
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd);
}
// 處理 UDP 數(shù)據(jù)
else if (sockfd == udpfd)
{
char buf[UDP_BUFFER_SIZE];
memset(buf, '\0', UDP_BUFFER_SIZE);
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
ret = recvfrom(udpfd, buf, UDP_BUFFER_SIZE - 1, 0, (struct sockaddr*)&client_address, &client_addrlength);
if (ret > 0)
{
sendto(udpfd, buf, UDP_BUFFER_SIZE - 1, 0, (struct sockaddr*)&client_address, client_addrlength);
}
}
// 處理 TCP 數(shù)據(jù)
else if (events[i].events & EPOLLIN)
{
char buf[TCP_BUFFER_SIZE];
while (1)
{
memset(buf, '\0', TCP_BUFFER_SIZE);
ret = recv(sockfd, buf, TCP_BUFFER_SIZE - 1, 0);
if (ret < 0)
{
if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
break;
}
close(sockfd);
break;
}
else if (ret == 0)
{
close(sockfd);
}
else
{
send(sockfd, buf, ret, 0);
}
}
}
else
{
printf("something else happened \n");
}
}
}
close(listenfd);
return 0;
}
代碼解釋:
-
setnonblocking
: 將套接字設置為非阻塞模式。 -
addfd
: 將套接字加入 epoll 事件監(jiān)聽集合。 - 創(chuàng)建 TCP 套接字
listenfd
,綁定并監(jiān)聽。 - 創(chuàng)建 UDP 套接字
udpfd
,綁定。 - 創(chuàng)建 epoll 實例
epollfd
,將 TCP 和 UDP 套接字加入監(jiān)聽集合。 - 進入主循環(huán),調用
epoll_wait
等待事件發(fā)生。 - 處理監(jiān)聽套接字上的事件,有新的 TCP 連接則接受連接并加入監(jiān)聽。
- 處理 UDP 套接字上的事件,接收數(shù)據(jù)并發(fā)送給客戶端。
- 處理其他連接上的事件,包括接收 TCP 數(shù)據(jù)和發(fā)送數(shù)據(jù)。
- 主循環(huán)中斷后關閉套接字。
9.8超級服務xinetd
Linux因特網(wǎng)服務inetd是超級服務。它同時管理著多個子服務,即監(jiān)聽多個端口?,F(xiàn)在 Linux系統(tǒng)上使用的inetd服務程序通常是其升級版本xinetd。xinetd程序的原理與inetd相同,但增加了一些控制選項,并提高了安全性。
xinetd
(Extended Internet Services Daemon)是一個在 Unix 系統(tǒng)上運行的守護進程,用于管理網(wǎng)絡服務的啟動和停止。它是 inetd
(Internet Services Daemon)的增強版本,提供了更多的功能和配置選項。以下是關于 xinetd
的詳細介紹:
特點和功能:
-
服務管理:
xinetd
主要負責啟動和停止網(wǎng)絡服務。它監(jiān)聽指定的端口,并根據(jù)客戶端請求啟動相應的服務。服務可以是任何可執(zhí)行程序。 -
資源節(jié)?。?/strong> 與常規(guī)的獨立守護進程相比,
xinetd
以超級服務器的形式運行。這意味著它僅在有連接請求時啟動相應的服務,而不是一直運行。這有助于節(jié)省系統(tǒng)資源。 -
并發(fā)連接控制:
xinetd
允許管理員限制同時連接到服務的客戶端數(shù)量。這對于控制服務器資源的使用非常有用。 -
Access Control: 提供了靈活的訪問控制機制,可以根據(jù) IP 地址、網(wǎng)絡掩碼、域名等對服務的訪問進行限制。
-
日志記錄:
xinetd
支持詳細的日志記錄,可幫助管理員跟蹤連接和服務的使用情況。 -
超時設置: 管理連接的超時設置,當連接在一定時間內沒有活動時,
xinetd
可以選擇關閉連接。 -
重試設置: 允許管理員配置連接失敗后的重試次數(shù)。
-
啟動參數(shù): 可以向服務傳遞額外的啟動參數(shù)。
配置文件:
xinetd
的配置文件通常位于 /etc/xinetd.conf
或 /etc/xinetd.d/
目錄下,文件名通常是服務的名字。配置文件采用簡單的鍵值對格式。
示例配置文件(/etc/xinetd.d/telnet
):
service telnet
{
disable = no
socket_type = stream
wait = no
user = root
server = /usr/sbin/in.telnetd
log_on_failure += USERID
}
常用命令:
-
啟動
xinetd
:sudo service xinetd start
-
停止
xinetd
:sudo service xinetd stop
-
重啟
xinetd
:sudo service xinetd restart
使用場景:
-
簡化服務管理: 對于系統(tǒng)管理員而言,
xinetd
提供了一種簡便的方式來啟動和管理網(wǎng)絡服務,而無需手動管理多個獨立的守護進程。 -
資源節(jié)?。?/strong> 在資源有限的系統(tǒng)上,
xinetd
可以幫助節(jié)省系統(tǒng)資源,因為它只在需要時啟動服務。 -
訪問控制:
xinetd
提供了強大的訪問控制機制,允許管理員細粒度地控制服務的訪問。 -
監(jiān)控和日志: 通過日志記錄功能,
xinetd
可以用于監(jiān)控服務的使用情況,有助于及時發(fā)現(xiàn)和解決問題。
總體而言,xinetd
是一個靈活而功能強大的超級服務器,適用于需要管理多個網(wǎng)絡服務的環(huán)境。
工作流程如下
文章來源:http://www.zghlxwxcb.cn/news/detail-812653.html
后記
截至2024年1月20日20點32分,完成第九章的學習,了解了select,poll和epoll幾個系統(tǒng)調用,對I/O復用的相關知識有了一定程度的了解。文章來源地址http://www.zghlxwxcb.cn/news/detail-812653.html
到了這里,關于《Linux高性能服務器編程》筆記04的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!