前面我們學(xué)習(xí)的非阻塞IO,雖然能夠在數(shù)據(jù)不就緒的時(shí)候處理其他事情,但是還是有一些不方便,而且每次都要為了一個(gè)文件描述符而進(jìn)行等待,所以為了提高IO效率我們還要學(xué)習(xí)IO多路轉(zhuǎn)接技術(shù)。
一、I/O多路轉(zhuǎn)接之select
1、select函數(shù)
select
是系統(tǒng)提供的一個(gè)多路轉(zhuǎn)接接口。
函數(shù)原型:
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
功能:
-
select
系統(tǒng)調(diào)用可以讓我們的程序同時(shí)監(jiān)視多個(gè)文件描述符的上的事件是否就緒。 -
select
的核心工作就是等,當(dāng)監(jiān)視的多個(gè)文件描述符中有一個(gè)或多個(gè)事件就緒時(shí),select
才會(huì)成功返回并將對(duì)應(yīng)文件描述符的就緒事件告知給調(diào)用者。
參數(shù)說(shuō)明:
-
nfds
:需要監(jiān)視的文件描述符中,最大的文件描述符的值+1,例如一個(gè)進(jìn)程打開(kāi)了0, 1, 2 ,3
四個(gè)文件描述符,我們想要對(duì)這四個(gè)文件描述符都進(jìn)行監(jiān)控,我們就需要填寫(xiě)4
。 -
readfds
:輸入輸出型參數(shù),其類型fd_set
是一個(gè)位圖結(jié)構(gòu)。- 調(diào)用時(shí)用戶通過(guò)設(shè)置對(duì)應(yīng)的比特位為1,告知內(nèi)核需要監(jiān)視哪些文件描述符的讀事件是否就緒,
- 返回時(shí)內(nèi)核通過(guò)設(shè)置對(duì)應(yīng)的比特位為1,告知用戶哪些文件描述符的讀事件已經(jīng)就緒。
-
writefds
:輸入輸出型參數(shù),其類型fd_set
是一個(gè)位圖結(jié)構(gòu)。- 調(diào)用時(shí)用戶通過(guò)設(shè)置對(duì)應(yīng)的比特位為1,告知內(nèi)核需要監(jiān)視哪些文件描述符的寫(xiě)事件是否就緒。
- 返回時(shí)內(nèi)核通過(guò)設(shè)置對(duì)應(yīng)的比特位為1,告知用戶哪些文件描述符的寫(xiě)事件已經(jīng)就緒。
-
exceptfds
:輸入輸出型參數(shù),其類型fd_set
是一個(gè)位圖結(jié)構(gòu)。- 調(diào)用時(shí)用戶通過(guò)設(shè)置對(duì)應(yīng)的比特位為1,告知內(nèi)核需要監(jiān)視哪些文件描述符的異常事件是否就緒。
- 返回時(shí)內(nèi)核通過(guò)設(shè)置對(duì)應(yīng)的比特位為1,告知用戶哪些文件描述符的異常事件已經(jīng)就緒。
-
timeout
:輸入輸出型參數(shù)- 調(diào)用時(shí)由用戶設(shè)置
select
函數(shù)在其內(nèi)部的等待數(shù)據(jù)的時(shí)間。 - 返回時(shí)表示
timeout
的剩余時(shí)間。
- 調(diào)用時(shí)由用戶設(shè)置
參數(shù)timeout
的值:
-
NULL/nullptr
:select
調(diào)用后進(jìn)行阻塞等待,直到被監(jiān)視的某個(gè)文件描述符上的某個(gè)事件就緒才返回。 -
0
:selec
調(diào)用后進(jìn)行非阻塞等待,無(wú)論被監(jiān)視的文件描述符上的事件是否就緒,select
檢測(cè)后都會(huì)立即返回。 -
特定的時(shí)間值:
select
調(diào)用后在指定的時(shí)間內(nèi)進(jìn)行阻塞等待,如果被監(jiān)視的文件描述符上一直沒(méi)有事件就緒,則在該時(shí)間后select
進(jìn)行超時(shí)返回。
timeval
結(jié)構(gòu)體的定義:
返回值說(shuō)明:
- 如果函數(shù)調(diào)用成功,則返回有事件就緒的文件描述符個(gè)數(shù)。
- 如果
timeout
時(shí)間耗盡,則返回0。 - 如果函數(shù)調(diào)用失敗,則返回
-1
,同時(shí)錯(cuò)誤碼會(huì)被設(shè)置。
select
調(diào)用失敗時(shí),錯(cuò)誤碼可能被設(shè)置為:
-
EBADF
:文件描述符為無(wú)效的或該文件已關(guān)閉。 -
EINTR
:此調(diào)用被信號(hào)所中斷。 -
EINVAL
:參數(shù)不合法,例如nfds為負(fù)值。 -
ENOMEM
:核心內(nèi)存不足。
2、fd_set的相關(guān)內(nèi)容
fd_set
本質(zhì)也是一個(gè)位圖,用位圖中對(duì)應(yīng)的位來(lái)表示要監(jiān)視的文件描述符fd_set
結(jié)構(gòu)如下:
操作fd_set的相關(guān)接口
我們要調(diào)用select
函數(shù)之前就需要先用fd_set
結(jié)構(gòu)定義出對(duì)應(yīng)的文件描述符集,然后將需要監(jiān)視的文件描述符添加到文件描述符集當(dāng)中,這個(gè)添加的過(guò)程本質(zhì)就是在進(jìn)行位操作,但是這個(gè)位操作不需要用戶進(jìn)行,系統(tǒng)提供了一組專門的宏接口,用于對(duì)fd_set
類型的位圖進(jìn)行各種操作。
void FD_CLR(int fd, fd_set *set); //用來(lái)清除文件描述符集set中相關(guān)fd的位
int FD_ISSET(int fd, fd_set *set); //用來(lái)測(cè)試文件描述符集set中相關(guān)fd的位是否為真
void FD_SET(int fd, fd_set *set); //用來(lái)設(shè)置文件描述符集set中相關(guān)fd的位
void FD_ZERO(fd_set *set); //用來(lái)清除文件描述符集set的全部位
fd_set的大小
我們可以使用下面的代碼進(jìn)行測(cè)試fd_set
的大?。?/p>
#include <iostream>
#include <sys/select.h>
int main()
{
std::cout << sizeof(fd_set) << std::endl;
return 0;
}
運(yùn)行結(jié)果:
這個(gè)位圖的上限是
128
?
8
=
1024
128 * 8 = 1024
128?8=1024,也就是我們的select
系統(tǒng)調(diào)用最多同時(shí)等待1024
個(gè)文件描述符。
而在Linux下我們可以使用ulimit -a
命令查看我們一個(gè)進(jìn)程最多打開(kāi)的文件描述符的個(gè)數(shù):
顯然select
并不能夠?qū)σ粋€(gè)進(jìn)程的所有文件描述符都進(jìn)行等待。
3、如何在代碼中高效的使用select函數(shù)
在了解如何在代碼中高效的使用select
函數(shù)之前,我們先來(lái)看一看select
的基本使用過(guò)程。
- 創(chuàng)建
fd_set
set變量,并使用FD_ZERO()
函數(shù)進(jìn)行初始化,則set用bit位表示是0000,0000。 - 若fd= 5,執(zhí)行
FD_SET(fd,&set)
后set變?yōu)?0001,0000
(第5位置為1)。 - 若再加入fd= 2, fd=1,則set變?yōu)?
0001,0011
。 - 執(zhí)行
select(6,&set,nullptr,nullptr,nullptr)
阻塞等待。 - 若fd=1,fd=2上都發(fā)生可讀事件,則
select
返回,此時(shí)set變?yōu)?0000,0011
。注意:沒(méi)有事件發(fā)生的fd=5被清空了。
所以我們會(huì)發(fā)現(xiàn)select
每次調(diào)用都需要我們重新設(shè)置要關(guān)心的文件描述符,因此select
服務(wù)器,使用的時(shí)候,需要程序員自己維護(hù)一個(gè)第三方數(shù)組,來(lái)進(jìn)行對(duì)已經(jīng)獲得的sock
進(jìn)行管理!
select使用的基本工作流程
如果我們要實(shí)現(xiàn)一個(gè)簡(jiǎn)單的select服務(wù)器,該服務(wù)器要做的就是讀取客戶端發(fā)來(lái)的數(shù)據(jù)進(jìn)行回發(fā)并打印,那么這個(gè)select
服務(wù)器的工作流程應(yīng)該是這樣的:
-
先初始化服務(wù)器,完成套接字的創(chuàng)建、綁定和監(jiān)聽(tīng)。
-
定義一個(gè)fd_array數(shù)組用于保存監(jiān)聽(tīng)套接字和已經(jīng)與客戶端建立連接的套接字。
-
然后服務(wù)器開(kāi)始循環(huán)調(diào)用select函數(shù),檢測(cè)讀事件是否就緒,如果就緒則執(zhí)行對(duì)應(yīng)的操作。
-
每次調(diào)用select函數(shù)之前,都需要定義一個(gè)讀文件描述符集readfds,并將fd_array當(dāng)中的文件描述符依次設(shè)置進(jìn)readfds當(dāng)中,表示讓select幫我們監(jiān)視這些文件描述符的讀事件是否就緒。
-
當(dāng)select檢測(cè)到數(shù)據(jù)就緒時(shí)會(huì)將讀事件就緒的文件描述符設(shè)置進(jìn)readfds當(dāng)中,此時(shí)我們就能夠得知哪些文件描述符的讀事件就緒了,并對(duì)這些文件描述符進(jìn)行對(duì)應(yīng)的操作。
-
如果讀事件就緒的是監(jiān)聽(tīng)套接字,則調(diào)用
accept
函數(shù)從底層全連接隊(duì)列獲取已經(jīng)建立好的連接,并將該連接對(duì)應(yīng)的套接字繼續(xù)添加到fd_array
數(shù)組當(dāng)中。 -
如果讀事件就緒的是與客戶端建立連接的套接字,則調(diào)用read函數(shù)讀取客戶端發(fā)來(lái)的數(shù)據(jù)并進(jìn)行打印輸出。
-
當(dāng)然,服務(wù)器與客戶端建立連接的套接字讀事件就緒,也可能是因?yàn)榭蛻舳藢⑦B接關(guān)閉了,此時(shí)服務(wù)器應(yīng)該調(diào)用
close
關(guān)閉該套接字,并將該套接字從fd_array數(shù)組當(dāng)中清除,因?yàn)橄乱淮尾恍枰俦O(jiān)視該文件描述符的讀事件了。
說(shuō)明一下:
-
因?yàn)閭魅雜elect函數(shù)的readfds、writefds和exceptfds都是輸入輸出型參數(shù),當(dāng)select函數(shù)返回時(shí)這些參數(shù)當(dāng)中的值已經(jīng)被修改了,因此每次調(diào)用select函數(shù)時(shí)都需要對(duì)其進(jìn)行重新設(shè)置,timeout也是類似的道理。
-
因?yàn)槊看握{(diào)用select函數(shù)之前都需要對(duì)readfds進(jìn)行重新設(shè)置,所以需要定義一個(gè)fd_array數(shù)組保存與客戶端已經(jīng)建立的若干連接和監(jiān)聽(tīng)套接字,實(shí)際fd_array數(shù)組當(dāng)中的文件描述符就是需要讓select監(jiān)視讀事件的文件描述符。
-
我們的select服務(wù)器只是讀取客戶端發(fā)來(lái)的數(shù)據(jù),因此只需要讓select幫我們監(jiān)視特定文件描述符的讀事件,如果要同時(shí)讓select幫我們監(jiān)視特定文件描述符的讀事件和寫(xiě)事件,則需要分別定義readfds和writefds,并定義兩個(gè)數(shù)組分別保存需要被監(jiān)視讀事件和寫(xiě)事件的文件描述符,便于每次調(diào)用select函數(shù)前對(duì)readfds和writefds進(jìn)行重新設(shè)置。
-
服務(wù)器剛開(kāi)始運(yùn)行時(shí),fd_array數(shù)組當(dāng)中只有監(jiān)聽(tīng)套接字,因此select第一次調(diào)用時(shí)只需要監(jiān)視監(jiān)聽(tīng)套接字的讀事件是否就緒,但每次調(diào)用accept獲取到新連接后,都會(huì)將新連接對(duì)應(yīng)的套接字添加到fd_array當(dāng)中,因此后續(xù)select調(diào)用時(shí)就需要監(jiān)視監(jiān)聽(tīng)套接字和若干連接套接字的讀事件是否就緒。
-
由于調(diào)用select時(shí)還需要傳入被監(jiān)視的文件描述符中最大文件描述符值+1,因此每次在遍歷fd_array對(duì)readfds進(jìn)行重新設(shè)置時(shí),還需要記錄最大文件描述符值。
這其中還有很多細(xì)節(jié),讓我們邊寫(xiě)代碼邊講解其中的細(xì)節(jié)。
4、select服務(wù)器
首先我們要對(duì)socket
的相關(guān)接口進(jìn)行一下封裝,以便于我們后續(xù)更好的使用,我們封裝的Sock
既能夠用于服務(wù)器,也能夠運(yùn)用于客戶端,當(dāng)然其中也涉及到了其他的文件和函數(shù),err.h
是錯(cuò)誤碼文件,logMessage
是一個(gè)日志打印函數(shù)。
// Sock.hpp
#pragma once
#include <iostream>
#include <cstring>
#include <cerrno>
#include <cstdlib>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include "log.hpp"
#include "err.h"
class Sock
{
public:
Sock()
:_sock(-1)
{}
void Socket()
{
_sock = socket(AF_INET, SOCK_STREAM, 0);
if (_sock < 0)
{
logMessage(Fatal, "socket fail: %s", strerror(errno));
exit(SOCKET_ERR);
}
// 設(shè)置地址復(fù)用
int opt = 1;
if (setsockopt(_sock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)))
{
logMessage(Fatal, "setsocket fail: %s", strerror(errno));
exit(SETSOCK_ERR);
}
}
void Bind(uint16_t port)
{
struct sockaddr_in local;
socklen_t len = sizeof(local);
memset(&local, 0, len);
local.sin_family = AF_INET;
local.sin_addr.s_addr = INADDR_ANY;
local.sin_port = htons(port);
if (bind(_sock, (struct sockaddr*)&local, len) < 0)
{
logMessage(Fatal, "bind fail : %s", strerror(errno));
exit(BIND_ERR);
}
}
void Listen(int backlog = 32)
{
if (listen(_sock, backlog) < 0)
{
logMessage(Fatal, "listen fail : %s", strerror(errno));
exit(LISTEN_ERR);
}
}
int Accept(std::string* client_ip, uint16_t* client_port)
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
memset(&client, 0, len);
int sockfd = accept(_sock, (struct sockaddr*)&client, &len);
if (sockfd < 0)
{
logMessage(Warning, "accept fail : %s", strerror(errno));
}
else
{
// 提取客戶端的相關(guān)信息
char buf[len];
inet_ntop(AF_INET, &client.sin_addr, buf, len);
*client_ip = buf;
*client_port = ntohs(client.sin_port);
}
// 不保證是正確的通信套接字,由外部自己判斷
return sockfd;
}
int Connect(const std::string& server_ip, uint16_t server_port)
{
struct sockaddr_in server;
socklen_t len = sizeof(server);
memset(&server, 0, len);
server.sin_family = AF_INET;
inet_pton(AF_INET, server_ip.c_str(), &server.sin_addr);
server.sin_port = htons(server_port);
// 不保證是正確的通信套接字,由外部自己判斷
return connect(_sock, (struct sockaddr*)&server, len);
}
int Fd()
{
return _sock;
}
void Close()
{
if (_sock >= 0)
{
close(_sock);
_sock = -1;
}
}
~Sock()
{
Close();
}
private:
int _sock; // 套接字,對(duì)于服務(wù)端來(lái)說(shuō)是監(jiān)聽(tīng)套接字,對(duì)于客戶端來(lái)說(shuō)是通信套接字
};
// log.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
// 日志等級(jí)
enum { Debuge = 0, Info, Warning, Error, Fatal, Unkonw };
static std::string toLevelString(int level)
{
switch (level)
{
case Debuge:
return "Debuge";
case Info:
return "Info";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "Unkonw";
}
}
static std::string getTime()
{
char buf[128];
time_t timep = time(nullptr);
struct tm stdtm;
localtime_r(&timep, &stdtm);
snprintf(buf, sizeof(buf), "%d-%d-%d %d:%d:%d", stdtm.tm_year + 1900, stdtm.tm_mon + 1, stdtm.tm_mday,
stdtm.tm_hour, stdtm.tm_min, stdtm.tm_sec);
return buf;
}
// 日志打印函數(shù)
// 日志格式: [等級(jí)] [時(shí)間] [進(jìn)程id] :消息體
void logMessage(int level, const char* format, ...)
{
// 1.形成左邊的固定格式
char logLeft[1024];
char logRight[1024];
std::string logLevel = toLevelString(level);
std::string curTime = getTime();
snprintf(logLeft, sizeof(logLeft), "[%s] [%s] [%d] : ", logLevel.c_str(), curTime.c_str(), getpid());
// 2.形成右邊的消息體格式
va_list ap;
va_start(ap, format);
vsnprintf(logRight, sizeof(logRight), format, ap);
va_end(ap);
// 3.進(jìn)行拼接,形成完整的日志 (此處可以根據(jù)需要重定向到文件中)
printf("%s%s\n", logLeft, logRight);
}
// err.h
#pragma once
enum
{
USAGE_ERR = 1,
SOCKET_ERR,
SETSOCK_ERR,
BIND_ERR,
LISTEN_ERR,
ACCEPT_ERR,
CONNECT_ERR
};
初始化
- 首先對(duì)于服務(wù)器,我們需要先進(jìn)行創(chuàng)建,綁定,監(jiān)聽(tīng)。
- 然后需要先把數(shù)組中所有的位置初始化為無(wú)效,并將監(jiān)聽(tīng)套接字添加到該數(shù)組當(dāng)中,fd_array數(shù)組當(dāng)中保存的就是需要被select監(jiān)視讀事件是否就緒的文件描述符。
#pragma once
#include <iostream>
#include <string>
#include <sys/select.h>
#include <cerrno>
#include <cstring>
#include "log.hpp"
#include "Sock.hpp"
// 設(shè)置默認(rèn)端口號(hào)
static const uint16_t default_port = 8080;
// 設(shè)置默認(rèn)無(wú)效的文件描述符
static const int default_fd = -1;
class SelectServer
{
using type_t = int;
public:
SelectServer(uint16_t port = default_port)
:_port(port)
{
_listen_fd.Socket();
_listen_fd.Bind(_port);
_listen_fd.Listen();
// 1.先把數(shù)組中所有的位置初始化為無(wú)效
for (size_t i = 0; i < _N; i++)
{
_fd_array[i] = default_fd;
}
// 2.把_listen_fd監(jiān)聽(tīng)套接字設(shè)置進(jìn)_fd_array
_fd_array[0] = _listen_fd.Fd();
}
~SelectServer()
{
_listen_fd.Close();
}
private:
Sock _listen_fd; // 監(jiān)聽(tīng)套接字
uint16_t _port; // 端口號(hào)
static const int _N = sizeof(fd_set) * 8; // fd_array內(nèi)元素的個(gè)數(shù)
type_t _fd_array[_N]; // 文件描述符管理數(shù)組
};
運(yùn)行服務(wù)器
-
select服務(wù)器運(yùn)行起來(lái)以后就會(huì)就不斷調(diào)用select函數(shù)監(jiān)視讀事件是否就緒,每次調(diào)用select函數(shù)之前都需要重新設(shè)置readfds,所以我們應(yīng)該在每一次調(diào)用之前都要先將readfds,和nfds先設(shè)置好。
-
具體設(shè)置過(guò)程就是遍歷fd_array數(shù)組,將fd_array數(shù)組當(dāng)中的合法文件描述符添加到readfds當(dāng)中,并同時(shí)記錄最大的文件描述符值nfds,因?yàn)楹罄m(xù)調(diào)用select函數(shù)時(shí)需要將nfds作為第一個(gè)參數(shù)傳入。
-
當(dāng)select函數(shù)返回后,如果返回值為0,則說(shuō)明timeout時(shí)間耗盡,此時(shí)直接準(zhǔn)備進(jìn)行下一次select調(diào)用即可。
-
如果select的返回值為-1,則說(shuō)明select調(diào)用失敗,此時(shí)也讓服務(wù)器準(zhǔn)備進(jìn)行下一次select調(diào)用,但實(shí)際應(yīng)該進(jìn)一步判斷錯(cuò)誤碼,根據(jù)錯(cuò)誤碼來(lái)判斷是否應(yīng)該繼續(xù)調(diào)用select函數(shù)。
-
如果select的返回值大于0,則說(shuō)明select函數(shù)調(diào)用成功,此時(shí)已經(jīng)有文件描述符的讀事件就緒,接下來(lái)就應(yīng)該對(duì)就緒事件進(jìn)行處理。
class SelectServer
{
using type_t = int;
public:
SelectServer(uint16_t port = default_port)
:_port(port)
{
// ...
}
void Start()
{
while (true)
{
fd_set readfds;
FD_ZERO(&readfds);
// 令nfds等于第一個(gè)文件描述符
int nfds = _fd_array[0];
for (size_t i = 0; i < _N; i++)
{
// 設(shè)置有效的文件描述符到readfds中
if (_fd_array[i] != default_fd)
{
FD_SET(_fd_array[i], &readfds);
// 尋找最大的nfds
nfds = nfds > _fd_array[i] ? nfds : _fd_array[i];
}
}
// 實(shí)際的nfds要 + 1
nfds += 1;
// struct timeval timeout = { 2, 0 }; 為了方便演示,這里我們采用阻塞調(diào)用select
int n = select(nfds, &readfds, nullptr, nullptr, nullptr);
switch (n)
{
case -1:
// select出錯(cuò)
logMessage(Warning, "select fail : %s, errno code :%d", strerror(errno), errno);
break;
case 0:
// 沒(méi)有讀事件就緒
logMessage(Debuge, "select timeout : %s, errno code :%d", strerror(errno), errno);
break;
default:
logMessage(Debuge, "有%d個(gè)事件發(fā)生了!", n);
// 處理就緒的事件
HandleEvent(readfds);
break;
}
}
}
void HandleEvent(fd_set& readfds)
{
// 處理事件...
}
~SelectServer()
{
_listen_fd.Close();
}
private:
Sock _listen_fd; // 監(jiān)聽(tīng)套接字
uint16_t _port; // 端口號(hào)
static const int _N = sizeof(fd_set) * 8; // fd_array內(nèi)元素的個(gè)數(shù)
type_t _fd_array[_N]; // 文件描述符管理數(shù)組
};
事件處理
當(dāng)select檢測(cè)到有文件描述符的讀事件就緒并成功返回后,接下來(lái)就應(yīng)該對(duì)就緒事件進(jìn)行處理了,這里編寫(xiě)一個(gè)HandlerEvent函數(shù),當(dāng)讀事件就緒后就調(diào)用該函數(shù)進(jìn)行事件處理。
-
在進(jìn)行事件處理時(shí)需要遍歷fd_array數(shù)組當(dāng)中的文件描述符,依次判斷各個(gè)文件描述符對(duì)應(yīng)的讀事件是否就緒,如果就緒則需要進(jìn)行事件處理。
-
當(dāng)一個(gè)文件描述符的讀事件就緒后,還需要進(jìn)一步判斷該文件描述符是否是監(jiān)聽(tīng)套接字,如果是監(jiān)聽(tīng)套接字的讀事件就緒,那么就應(yīng)該調(diào)用accept函數(shù)將底層的連接獲取上來(lái)。但是光光調(diào)用accept將連接獲取上來(lái)還不夠,為了下一次調(diào)用select函數(shù)時(shí)能夠讓select幫我們監(jiān)視新連接的讀事件是否就緒,在連接獲取上來(lái)后還應(yīng)該將該連接對(duì)應(yīng)的文件描述符添加到fd_array數(shù)組當(dāng)中,這樣在下一次調(diào)用select函數(shù)前對(duì)readfds重新設(shè)置時(shí)就能將該文件描述符添加進(jìn)去了。
-
如果是與客戶端建立的連接對(duì)應(yīng)的讀事件就緒,那么就應(yīng)該調(diào)用recv函數(shù)讀取客戶端發(fā)來(lái)的數(shù)據(jù),如果讀取成功則將讀到的數(shù)據(jù)在服務(wù)器端進(jìn)行打印并轉(zhuǎn)發(fā)給客戶端。如果調(diào)用recv函數(shù)讀取失敗或者客戶端關(guān)閉了連接,那么select服務(wù)器也應(yīng)該調(diào)用close函數(shù)關(guān)閉對(duì)應(yīng)的連接,但此時(shí)光光關(guān)閉連接也是不夠的,還應(yīng)該將該連接對(duì)應(yīng)的文件描述符從fd_array數(shù)組當(dāng)中清除,否則后續(xù)調(diào)用的select函數(shù)還會(huì)幫我們監(jiān)視該連接的讀事件是否就緒,但實(shí)際已經(jīng)不需要了。
class SelectServer
{
using type_t = int;
public:
SelectServer(uint16_t port = default_port)
:_port(port)
{
// ...
}
void Start()
{
// ...
}
void HandleEvent(fd_set& readfds)
{
// 處理就緒事件
for (size_t i = 0; i < _N; i++)
{
if ((_fd_array[i] == _listen_fd.Fd()) && (FD_ISSET(_fd_array[i], &readfds)))
{
// 處理連接事件
Accepter();
}
else if(_fd_array[i] != _listen_fd.Fd() && (FD_ISSET(_fd_array[i], &readfds)))
{
// 處理讀事件
ServiceIO(_fd_array[i]);
}
// 可以在HandleEvent里面添加 writefds參數(shù),在這里繼續(xù)判斷處理寫(xiě)事件
// ...
}
// Print是打印_fd_array[]里面的內(nèi)容,為了我們調(diào)試我們的代碼。
Print();
}
void Accepter()
{
std::string client_ip;
uint16_t client_port;
int sockfd = _listen_fd.Accept(&client_ip, &client_port);
if (sockfd < 0) return;
// 將新的sockfd設(shè)置進(jìn)_fd_array里面
size_t i = 1;
for (; i < _N; i++)
{
if (_fd_array[i] == default_fd)
{
break;
}
}
if (i >= _N)
{
//_fd_array滿了
logMessage(Warning, "_fd_array is full!");
close(sockfd);
}
else
{
_fd_array[i] = sockfd;
}
}
void ServiceIO(int fd)
{
char buf[1024];
ssize_t n = recv(fd, buf, sizeof(buf), 0);
if (n <= 0)
{
if (errno == EINTR)
{
logMessage(Info, "ServiceIO's recv was interrupted by the signal ...");
return;
}
else if (n == 0)
{
logMessage(Info, "ServiceIO's recv reached the end of file, fd %d -> fd %d", fd, default_fd);
}
else
{
logMessage(Warning, "ServiceIO's recv fail: %s , errno code :%d", strerror(errno), errno);
}
// 關(guān)閉fd
close(fd);
// 將fd從_fd_array[]中移除
for (size_t i = 0; i < _N; i++)
{
if (_fd_array[i] == fd)
{
_fd_array[i] = default_fd;
break;
}
}
}
else
{
// 處理讀到\r\n
buf[n - 2] = 0;
buf[n - 1] = 0;
std::cout << "client# " << buf << std::endl;
std::string respond = buf;
respond += "[Server echo]\n";
// 這里也要進(jìn)行判斷寫(xiě)事件是否就緒,我們先忽略
send(fd, respond.c_str(), respond.size(), 0);
}
}
void Print()
{
// 打印_fd_array[]
std::cout << "_fd_array[] : ";
for (int i = 0; i < _N; i++)
{
if (_fd_array[i] != default_fd)
{
std::cout << _fd_array[i] << " ";
}
}
std::cout << std::endl;
}
~SelectServer()
{
_listen_fd.Close();
}
private:
Sock _listen_fd; // 監(jiān)聽(tīng)套接字
uint16_t _port; // 端口號(hào)
static const int _N = sizeof(fd_set) * 8; // fd_array內(nèi)元素的個(gè)數(shù)
type_t _fd_array[_N]; // 文件描述符管理數(shù)組
};
當(dāng)前我們編寫(xiě)的select
服務(wù)器實(shí)際還存在一些問(wèn)題,我們暫時(shí)先進(jìn)行忽略(后面的poll
,epoll
也存在相同的問(wèn)題):
-
服務(wù)器不能直接調(diào)用
send
函數(shù),因?yàn)槲覀冎苯诱{(diào)用send函數(shù)時(shí)實(shí)際也分為“等”和“拷貝”兩步,我們的發(fā)送緩沖區(qū)可能并沒(méi)有足夠的空間供我們進(jìn)行寫(xiě)入,所以我們也應(yīng)該將“等”的這個(gè)過(guò)程交給select函數(shù),因此在每次調(diào)用select函數(shù)之前,除了需要重新設(shè)置readfds還需要重新設(shè)置writefds,因此我們還需要一個(gè)數(shù)組來(lái)保存需要被監(jiān)視寫(xiě)事件是否就緒的文件描述符,當(dāng)某一文件描述符的寫(xiě)事件就緒時(shí)我們才能夠調(diào)用send函數(shù)向客戶端發(fā)送數(shù)據(jù)。 -
沒(méi)有定制協(xié)議。代碼中讀取數(shù)據(jù)時(shí)并沒(méi)有按照某種規(guī)則進(jìn)行讀取,此時(shí)就可能造成粘包或者數(shù)據(jù)讀取不全的問(wèn)題,根本原因就是因?yàn)槲覀儧](méi)有定制協(xié)議,比如HTTP協(xié)議規(guī)定在讀取底層數(shù)據(jù)時(shí)讀取到空行就表明讀完了一個(gè)HTTP報(bào)頭,此時(shí)再根據(jù)HTTP報(bào)頭當(dāng)中的Content-Length屬性得知正文的長(zhǎng)度,最終就能夠讀取到一個(gè)完整的HTTP報(bào)文,HTTP協(xié)議通過(guò)這種方式就避免了粘包和數(shù)據(jù)讀取不全的問(wèn)題。
select服務(wù)器測(cè)試
運(yùn)行select
服務(wù)器時(shí)需要先實(shí)例化出一個(gè)SelectServer對(duì)象,然后讓select服務(wù)器直接調(diào)用進(jìn)行Start
函數(shù)后就可以運(yùn)行服務(wù)器了。
#include <iostream>
#include <memory>
#include "SelectServer.hpp"
int main()
{
std::unique_ptr<SelectServer> up(new SelectServer);
up->Start();
return 0;
}
至此簡(jiǎn)單的select服務(wù)器代碼已經(jīng)編寫(xiě)完畢,我們先使用telnet工具進(jìn)行測(cè)試一下,telnet工具連接我們的服務(wù)器以后,此時(shí)通過(guò)telnet向服務(wù)器發(fā)送的數(shù)據(jù)就能夠被服務(wù)器讀到并且打印輸出了。
5、select的優(yōu)缺點(diǎn)
select的優(yōu)點(diǎn):
- 可以同時(shí)等待多個(gè)文件描述符,并且只負(fù)責(zé)等待,實(shí)際的IO操作由accept、recv、send等接口來(lái)完成,這些接口在進(jìn)行IO操作時(shí)不會(huì)被阻塞。
- select同時(shí)等待多個(gè)文件描述符,因此可以將“等”的時(shí)間重疊,提高了IO的效率。
當(dāng)然,這也是所有多路轉(zhuǎn)接接口的優(yōu)點(diǎn)。
select的缺點(diǎn):
- 每次調(diào)用select,都需要手動(dòng)設(shè)置fd集合,從接口使用角度來(lái)說(shuō)也非常不便。
- 每次調(diào)用select,都需要把fd集合從用戶態(tài)拷貝到內(nèi)核態(tài),這個(gè)開(kāi)銷在fd很多時(shí)會(huì)變大。
- 同時(shí)每次調(diào)用select都需要在內(nèi)核遍歷傳遞進(jìn)來(lái)的所有fd,這個(gè)開(kāi)銷在fd很多時(shí)也很大。
- select可監(jiān)控的文件描述符數(shù)量有限。
6、select的適用場(chǎng)景
多路轉(zhuǎn)接接口select
、poll
和epoll
,需要在一定的場(chǎng)景下使用,如果場(chǎng)景選擇的不適宜,可能會(huì)適得其反。
-
多路轉(zhuǎn)接接口一般適用于多連接,且多連接中只有少部分連接比較活躍。因?yàn)樯倭窟B接比較活躍,也就意味著幾乎所有的連接在進(jìn)行IO操作時(shí),都需要花費(fèi)大量時(shí)間來(lái)等待事件就緒,此時(shí)使用多路轉(zhuǎn)接接口就可以將這些等的事件進(jìn)行重疊,提高IO效率。
-
對(duì)于多連接中大部分連接都很活躍的場(chǎng)景,其實(shí)并不適合使用多路轉(zhuǎn)接。因?yàn)槊總€(gè)連接都很活躍,也就意味著任何時(shí)刻每個(gè)連接上的事件基本都是就緒的,此時(shí)根本不需要?jiǎng)佑枚嗦忿D(zhuǎn)接接口來(lái)幫我們進(jìn)行等待,畢竟使用多路轉(zhuǎn)接接口也是需要花費(fèi)系統(tǒng)的時(shí)間和空間資源的。
多連接中只有少量連接是比較活躍的,比如聊天工具QQ,我們登錄QQ后大部分時(shí)間其實(shí)是沒(méi)有聊天的,此時(shí)服務(wù)器端不可能每一個(gè)用戶都派發(fā)一個(gè)線程調(diào)用一個(gè)read函數(shù)阻塞等待讀事件就緒。
多連接中大部分連接都很活躍,比如企業(yè)當(dāng)中進(jìn)行數(shù)據(jù)備份時(shí),兩臺(tái)服務(wù)器之間不斷在交互數(shù)據(jù),這時(shí)的連接是特別活躍的,幾乎不需要等的過(guò)程,也就沒(méi)必要使用多路轉(zhuǎn)接接口了。
二、I/O多路轉(zhuǎn)接之poll
poll也是系統(tǒng)提供的一個(gè)多路轉(zhuǎn)接接口。
poll系統(tǒng)調(diào)用也可以讓我們的程序同時(shí)監(jiān)視多個(gè)文件描述符上的事件是否就緒,和select的定位是一樣的,適用場(chǎng)景也是一樣的。
1、poll函數(shù)
函數(shù)原型:
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
參數(shù)說(shuō)明:
-
fds
:一個(gè)poll函數(shù)監(jiān)視的結(jié)構(gòu)列表,每一個(gè)元素包含三部分內(nèi)容:文件描述符、監(jiān)視的事件集合、就緒的事件集合。 -
nfds
:表示fds數(shù)組的長(zhǎng)度。 -
timeout
:表示poll函數(shù)的超時(shí)時(shí)間,單位是毫秒(ms)。
參數(shù)timeout
的取值:
- -1:poll調(diào)用后進(jìn)行阻塞等待,直到被監(jiān)視的某個(gè)文件描述符上的某個(gè)事件就緒。
- 0:poll調(diào)用后進(jìn)行非阻塞等待,無(wú)論被監(jiān)視的文件描述符上的事件是否就緒,poll檢測(cè)后都會(huì)立即返回。
- 特定的時(shí)間值:poll調(diào)用后在指定的時(shí)間內(nèi)進(jìn)行阻塞等待,如果被監(jiān)視的文件描述符上一直沒(méi)有事件就緒,則在該時(shí)間后poll進(jìn)行超時(shí)返回。
返回值說(shuō)明:
- 如果函數(shù)調(diào)用成功,則返回有事件就緒的文件描述符個(gè)數(shù)。
- 如果timeout時(shí)間耗盡,則返回0。
- 如果函數(shù)調(diào)用失敗,則返回-1,同時(shí)錯(cuò)誤碼會(huì)被設(shè)置。
poll調(diào)用失敗時(shí),錯(cuò)誤碼可能被設(shè)置為:
-
EFAULT
:fds數(shù)組不包含在調(diào)用程序的地址空間中。 -
EINTR
:此調(diào)用被信號(hào)所中斷。 -
EINVAL
:nfds值超過(guò)RLIMIT_NOFILE
值。 -
ENOMEM
:核心內(nèi)存不足。
2、struct pollfd結(jié)構(gòu)
struct pollfd結(jié)構(gòu)當(dāng)中包含三個(gè)成員:
struct pollfd
{
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
-
fd
:特定的文件描述符,若設(shè)置為負(fù)值則忽略events
字段并且revents
字段返回0。 -
events
:需要監(jiān)視該文件描述符上的哪些事件,由用戶設(shè)置。 -
revents
:poll函數(shù)返回時(shí)告知用戶該文件描述符上的哪些事件已經(jīng)就緒,由內(nèi)核進(jìn)行設(shè)置。
events
和revents
的取值:
事件 | 描述 | 是否可作為輸入 | 是否可作為輸出 |
---|---|---|---|
POLLIN | 數(shù)據(jù)(包括普通數(shù)據(jù)和優(yōu)先數(shù)據(jù))可讀 | 是 | 是 |
POLLRDNORM | 普通數(shù)據(jù)可讀 | 是 | 是 |
POLLRDBAND | 優(yōu)先級(jí)帶數(shù)據(jù)可讀(Linux不支持) | 是 | 是 |
POLLPRI | 高優(yōu)先級(jí)數(shù)據(jù)可讀,比如TCP帶外數(shù)據(jù) | 是 | 是 |
POLLOUT | 數(shù)據(jù)(包括普通數(shù)據(jù)和優(yōu)先數(shù)據(jù))可寫(xiě) | 是 | 是 |
POLLWRNORM | 普通數(shù)據(jù)可寫(xiě) | 是 | 是 |
POLLWRBAND | 優(yōu)先級(jí)帶數(shù)據(jù)可寫(xiě) | 是 | 是 |
POLLRDHUP | TCP連接被對(duì)方關(guān)閉,或者對(duì)方關(guān)閉了寫(xiě)操作,它由GNU引入 | 是 | 是 |
POLLERR | 錯(cuò)誤 | 否 | 是 |
POLLHUP | 掛起。比如管道的寫(xiě)端被關(guān)閉后,讀端描述符上將收到POLLHUP事件 | 否 | 是 |
POLLNVAL | 文件描述符沒(méi)有打開(kāi) | 否 | 是 |
這些取值實(shí)際都是以宏的方式進(jìn)行定義的,它們的二進(jìn)制序列當(dāng)中有且只有一個(gè)比特位是1,且為1的比特位是各不相同的,我們實(shí)際使用時(shí)最常用的就是POLLIN
和POLLOUT
。
- 因此在調(diào)用poll函數(shù)之前,可以通過(guò)或(
|
)運(yùn)算符將要監(jiān)視的事件添加到events
成員當(dāng)中。 - 在poll函數(shù)返回后,可以通過(guò)與(
&
)運(yùn)算符檢測(cè)revents
成員中是否包含特定事件,以得知對(duì)應(yīng)文件描述符的特定事件是否就緒。
2、poll服務(wù)器
poll的工作流程和select是基本類似的,這里我們也實(shí)現(xiàn)一個(gè)簡(jiǎn)單poll服務(wù)器,該服務(wù)器也只是讀取客戶端發(fā)來(lái)的數(shù)據(jù)打印并進(jìn)行轉(zhuǎn)發(fā)。
初始化
-
首先對(duì)于服務(wù)器,我們需要先向操作系統(tǒng)申請(qǐng)一塊內(nèi)存用于定義一個(gè)
fds
數(shù)組,該數(shù)組當(dāng)中的每個(gè)位置都是一個(gè)struct pollfd
結(jié)構(gòu),后續(xù)調(diào)用poll函數(shù)時(shí)會(huì)作為參數(shù)進(jìn)行傳入,然后進(jìn)行創(chuàng)建,綁定,監(jiān)聽(tīng)套接字。 -
最后需要把fds數(shù)組中所有的位置初始化為無(wú)效,并將監(jiān)聽(tīng)套接字添加到該數(shù)組當(dāng)中,表示服務(wù)器剛開(kāi)始運(yùn)行時(shí)只需要監(jiān)視監(jiān)聽(tīng)套接字的讀事件。
#pragma once
#include <iostream>
#include <string>
#include <poll.h>
#include <cerrno>
#include <cstring>
#include "log.hpp"
#include "Sock.hpp"
// 設(shè)置默認(rèn)端口號(hào)
static const uint16_t default_port = 8080;
// 設(shè)置默認(rèn)無(wú)效的文件描述符
static const int default_fd = -1;
// 設(shè)置默認(rèn)的無(wú)效事件
static const short default_event = 0;
class PollServer
{
public:
PollServer(uint16_t port = default_port, size_t num = 1024)
:_port(port), _num(num), _fd_array(new struct pollfd[_num])
{
_listen_fd.Socket();
_listen_fd.Bind(_port);
_listen_fd.Listen();
// 1.先把數(shù)組中所有的位置初始化為無(wú)效
for (size_t i = 0; i < _num; i++)
{
_fd_array[i].fd = default_fd;
_fd_array[i].events = default_event;
}
// 2.把_listen_fd監(jiān)聽(tīng)套接字設(shè)置進(jìn)_fd_array,并設(shè)置關(guān)心事件為讀事件
_fd_array[0].fd = _listen_fd.Fd();
_fd_array[0].events = POLLIN;
}
~PollServer()
{
_listen_fd.Close();
if (_fd_array)
{
delete[] _fd_array;
_fd_array = nullptr;
}
}
private:
Sock _listen_fd; // 監(jiān)聽(tīng)套接字
uint16_t _port; // 端口號(hào)
size_t _num; // _fd_array的長(zhǎng)度
struct pollfd* _fd_array; // 指向了管理fd的數(shù)組
};
運(yùn)行服務(wù)器
- poll服務(wù)器就不斷調(diào)用poll函數(shù)監(jiān)視讀事件是否就緒。如果poll函數(shù)的返回值大于0,則說(shuō)明poll函數(shù)調(diào)用成功,此時(shí)已經(jīng)有文件描述符的讀事件就緒,接下來(lái)就應(yīng)該對(duì)就緒事件進(jìn)行處理。
- 如果poll函數(shù)的返回值等于0,則說(shuō)明timeout時(shí)間耗盡,此時(shí)直接準(zhǔn)備進(jìn)行下一次poll調(diào)用即可。
- 如果poll函數(shù)的返回值為-1,則說(shuō)明poll調(diào)用失敗,此時(shí)也讓服務(wù)器準(zhǔn)備進(jìn)行下一次poll調(diào)用,但實(shí)際應(yīng)該進(jìn)一步判斷錯(cuò)誤碼,根據(jù)錯(cuò)誤碼來(lái)判斷是否應(yīng)該繼續(xù)調(diào)用poll函數(shù)。
可以看出這里我們使用poll時(shí)要比使用select時(shí)要簡(jiǎn)單的多!
class PollServer
{
public:
PollServer(uint16_t port = default_port, size_t num = 1024)
:_port(port), _num(num), _fd_array(new struct pollfd[_num])
{
// ....
}
void Start()
{
while (true)
{
// int timeout = 2000; 為了方便演示這里采用了阻塞調(diào)用
int n = poll(_fd_array, _num, -1);
switch (n)
{
case -1:
// poll出錯(cuò)
logMessage(Warning, "poll fail : %s, errno code :%d", strerror(errno), errno);
break;
case 0:
// 沒(méi)有讀事件就緒
logMessage(Debuge, "poll timeout : %s, errno code :%d", strerror(errno), errno);
break;
default:
logMessage(Debuge, "有%d個(gè)事件發(fā)生了!", n);
HandleEvent();
break;
}
}
}
void HandleEvent()
{
// 處理事件
}
~PollServer()
{
_listen_fd.Close();
if (_fd_array)
{
delete[] _fd_array;
_fd_array = nullptr;
}
}
private:
Sock _listen_fd; // 監(jiān)聽(tīng)套接字
uint16_t _port; // 端口號(hào)
size_t _num; // _fd_array的長(zhǎng)度
struct pollfd* _fd_array; // 指向了管理fd的數(shù)組
};
事件處理
當(dāng)poll檢測(cè)到有文件描述符的讀事件就緒,就會(huì)在其對(duì)應(yīng)的struct pollfd
結(jié)構(gòu)中的revents
成員中添加讀事件并返回,接下來(lái)poll服務(wù)器就應(yīng)該對(duì)就緒事件進(jìn)行處理了,事件處理過(guò)程如下:
-
首先遍歷fds數(shù)組中的每個(gè)struct pollfd結(jié)構(gòu),如果該結(jié)構(gòu)當(dāng)中的fd有效,且revents當(dāng)中包含讀事件,則說(shuō)明該文件描述符的讀事件就緒,接下來(lái)就需要進(jìn)一步判斷該文件描述符是監(jiān)聽(tīng)套接字還是與客戶端建立的套接字。
-
如果是監(jiān)聽(tīng)套接字的讀事件就緒,則調(diào)用accept函數(shù)將底層建立好的連接獲取上來(lái),并將獲取到的套接字添加到fds數(shù)組當(dāng)中,表示下一次調(diào)用poll函數(shù)時(shí)需要監(jiān)視該套接字的讀事件,如果數(shù)組滿了我們可以進(jìn)行動(dòng)態(tài)擴(kuò)容。
-
如果是與客戶端建立的連接對(duì)應(yīng)的讀事件就緒,則調(diào)用recv函數(shù)讀取客戶端發(fā)來(lái)的數(shù)據(jù),并將讀取到的數(shù)據(jù)在服務(wù)器端進(jìn)行打印并轉(zhuǎn)發(fā)。
-
如果在調(diào)用recv函數(shù)時(shí)發(fā)現(xiàn)客戶端將連接關(guān)閉或recv函數(shù)調(diào)用失敗,則poll服務(wù)器也直接關(guān)閉對(duì)應(yīng)的連接,并將該連接對(duì)應(yīng)的文件描述符從fds數(shù)組當(dāng)中清除,表示下一次調(diào)用poll函數(shù)時(shí)無(wú)需再監(jiān)視該套接字的讀事件。
class PollServer
{
public:
PollServer(uint16_t port = default_port, size_t num = 1024)
:_port(port), _num(num), _fd_array(new struct pollfd[_num])
{
// ...
}
void Start()
{
// ...
}
void HandleEvent()
{
// 處理就緒事件
for (size_t i = 0; i < _num; i++)
{
if (_fd_array[i].fd == default_fd)
{
continue;
}
// 處理讀取事件
if (_fd_array[i].revents & POLLIN)
{
// 處理連接事件
if (_fd_array[i].fd == _listen_fd.Fd())
{
Accepter();
}
else
{
// 處理讀事件
Recver(_fd_array[i].fd);
}
}
else if (_fd_array[i].revents & POLLOUT)
{
// 處理寫(xiě)事件
}
}
// Print是打印_fd_array[]里面的fd,為了我們調(diào)試我們的代碼。
Print();
}
void Accepter()
{
std::string client_ip;
uint16_t client_port;
int sockfd = _listen_fd.Accept(&client_ip, &client_port);
if (sockfd < 0) return;
// 將新的sockfd設(shè)置進(jìn)_fd_array里面
size_t i = 1;
for (; i < _num; i++)
{
if (_fd_array[i].fd == default_fd)
{
break;
}
}
if (i >= _num)
{
//_fd_array滿了
size_t new_num = _num * 2;
logMessage(Info, "_fd_array is full!, _num:%d -> _num:%d", _num, new_num);
// 進(jìn)行擴(kuò)容
struct pollfd* pollfd_array = new struct pollfd[new_num];
for (size_t i = 0; i < new_num; i++)
{
pollfd_array[i].fd = default_fd;
pollfd_array[i].events = default_event;
}
memcpy(pollfd_array, _fd_array, sizeof(struct pollfd) * _num);
delete[] _fd_array;
_fd_array = pollfd_array;
_num = new_num;
}
_fd_array[i].fd = sockfd;
_fd_array[i].events = POLLIN;
//_fd_array[i].events = POLLIN | POLLOUT;
}
void Recver(int fd)
{
char buf[1024];
ssize_t n = recv(fd, buf, sizeof(buf), 0);
if (n <= 0)
{
if (errno == EINTR)
{
logMessage(Info, "Recver's recv was interrupted by the signal ...");
return;
}
else if (n == 0)
{
logMessage(Info, "Recver's recv reached the end of file, fd %d -> fd %d", fd, default_fd);
}
else
{
logMessage(Warning, "Recver's recv fail: %s , errno code :%d", strerror(errno), errno);
}
// 關(guān)閉fd
close(fd);
// 將fd從_fd_array[]中移除
for (size_t i = 0; i < _num; i++)
{
if (_fd_array[i].fd == fd)
{
_fd_array[i].fd = default_fd;
_fd_array[i].events = default_event;
break;
}
}
}
else
{
// 處理讀到的\r\n
buf[n - 2] = 0;
buf[n - 1] = 0;
std::cout << "client# " << buf << std::endl;
std::string respond = buf;
respond += "[Server echo]\n";
// 這里也要進(jìn)行判斷寫(xiě)事件是否就緒
send(fd, respond.c_str(), respond.size(), 0);
}
}
void Print()
{
// 打印_fd_array[]
std::cout << "_fd_array[] : ";
for (int i = 0; i < _num; i++)
{
if (_fd_array[i].fd != default_fd)
{
std::cout << _fd_array[i].fd << " ";
}
}
std::cout << std::endl;
}
~PollServer()
{
_listen_fd.Close();
if (_fd_array)
{
delete[] _fd_array;
_fd_array = nullptr;
}
}
private:
Sock _listen_fd; // 監(jiān)聽(tīng)套接字
uint16_t _port; // 端口號(hào)
size_t _num; // _fd_array的長(zhǎng)度
struct pollfd* _fd_array; // 指向了管理fd的數(shù)組
};
poll服務(wù)器測(cè)試
運(yùn)行poll
服務(wù)器時(shí)需要先實(shí)例化出一個(gè)PollServer對(duì)象,然后讓PollServer服務(wù)器直接調(diào)用進(jìn)行Start
函數(shù)后就可以運(yùn)行服務(wù)器了。
#include <iostream>
#include <memory>
#include "PollServer.hpp"
int main()
{
std::unique_ptr<PollServer> up(new PollServer);
up->Start();
return 0;
}
和select一樣,我們也使用telnet工具進(jìn)行測(cè)試一下,測(cè)試的流程一樣
3、poll的優(yōu)缺點(diǎn)
poll的優(yōu)點(diǎn):
-
struct pollfd
結(jié)構(gòu)當(dāng)中包含了events
和revents
,相當(dāng)于將select
的輸入輸出型參數(shù)進(jìn)行分離,因此在每次調(diào)用poll
之前,不需要像select
一樣重新對(duì)參數(shù)進(jìn)行設(shè)置。 - poll可監(jiān)控的文件描述符數(shù)量沒(méi)有限制。
- 當(dāng)然,poll也可以同時(shí)等待多個(gè)文件描述符,能夠提高IO的效率。
poll的缺點(diǎn):
- 和select函數(shù)一樣,當(dāng)poll返回后,需要遍歷fds數(shù)組來(lái)獲取就緒的文件描述符。
- 每次調(diào)用poll,都需要把大量的
struct pollfd
結(jié)構(gòu)從用戶態(tài)拷貝到內(nèi)核態(tài),這個(gè)開(kāi)銷也會(huì)隨著poll監(jiān)視的文件描述符數(shù)目的增多而增大。 - 同時(shí)每次調(diào)用poll都需要在內(nèi)核遍歷傳遞進(jìn)來(lái)的所有fd,這個(gè)開(kāi)銷在fd很多時(shí)也變大。
三、I/O多路轉(zhuǎn)接之epoll
epoll初識(shí),epoll也是系統(tǒng)提供的一個(gè)多路轉(zhuǎn)接接口。
- epoll系統(tǒng)調(diào)用也可以讓我們的程序同時(shí)監(jiān)視多個(gè)文件描述符上的事件是否就緒,與select和poll的定位是一樣的,適用場(chǎng)景也相同。
- epoll在命名上比poll多了一個(gè)e,這個(gè)e可以理解成是extend,是為處理大批量句柄而作了改進(jìn)的poll。
- epoll在2.5.44內(nèi)核中被引進(jìn),它幾乎具備了select和poll的所有優(yōu)點(diǎn),被公認(rèn)為L(zhǎng)inux2.6下性能最好的多路I/O就緒通知方法。
1、epoll的相關(guān)系統(tǒng)調(diào)用
epoll有三個(gè)相關(guān)的系統(tǒng)調(diào)用,分別是epoll_create
、epoll_ctl
和epoll_wait
。
1、epoll_create函數(shù)
int epoll_create(int size);
功能:
-
epoll_create
函數(shù)用于創(chuàng)建一個(gè)epoll
模型。
參數(shù)說(shuō)明:
-
size
:自從Linux2.6.8之后,size參數(shù)是被忽略的,但size的值必須設(shè)置為大于0的值。
返回值說(shuō)明:
-
epoll
模型創(chuàng)建成功返回其對(duì)應(yīng)的文件描述符,否則返回-1
,同時(shí)錯(cuò)誤碼會(huì)被設(shè)置。
注意: 當(dāng)不再使用時(shí),必須調(diào)用close函數(shù)關(guān)閉epoll模型對(duì)應(yīng)的文件描述符,當(dāng)所有引用epoll實(shí)例的文件描述符都已關(guān)閉時(shí),內(nèi)核將銷毀該實(shí)例并釋放相關(guān)資源。
2、epoll_ctl函數(shù)
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
功能:
-
epoll_ctl
函數(shù)用于向指定的epoll模型中注冊(cè)事件。
參數(shù)說(shuō)明:
-
epfd
:指定的epoll模型。 -
op
:表示具體的動(dòng)作,用三個(gè)宏來(lái)表示。 -
fd
:需要監(jiān)視的文件描述符。 -
event
:需要監(jiān)視該文件描述符上的哪些事件。
第二個(gè)參數(shù)op的取值有以下三種:
宏名稱 | 功能 |
---|---|
EPOLL_CTL_ADD | 注冊(cè)新的文件描述符到指定的epoll模型中 |
EPOLL_CTL_MOD | 修改已經(jīng)注冊(cè)的文件描述符的監(jiān)聽(tīng)事件 |
EPOLL_CTL_DEL | 從epoll模型中刪除指定的文件描述符 |
返回值說(shuō)明:
- 函數(shù)調(diào)用成功返回0,調(diào)用失敗返回-1,同時(shí)錯(cuò)誤碼會(huì)被設(shè)置。
3、epoll_wait函數(shù)
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
功能:
-
epoll_wait
函數(shù)用于收集監(jiān)視的事件中已經(jīng)就緒的事件。
參數(shù)說(shuō)明:
-
epfd
:指定的epoll模型。 -
events
:內(nèi)核會(huì)將已經(jīng)就緒的事件拷貝到events數(shù)組當(dāng)中(events不能是空指針,內(nèi)核只負(fù)責(zé)將就緒事件拷貝到該數(shù)組中,不會(huì)幫我們?cè)谟脩魬B(tài)中分配內(nèi)存)。 -
maxevents
:events數(shù)組中的元素個(gè)數(shù)。 -
timeout
:表示epoll_wait函數(shù)的超時(shí)時(shí)間,單位是毫秒(ms)。
參數(shù)timeout
的取值:
-
-1
:epoll_wait調(diào)用后進(jìn)行阻塞等待,直到被監(jiān)視的某個(gè)文件描述符上的某個(gè)事件就緒。 -
0
:epoll_wait調(diào)用后進(jìn)行非阻塞等待,無(wú)論被監(jiān)視的文件描述符上的事件是否就緒,epoll_wait檢測(cè)后都會(huì)立即返回。 -
特定的時(shí)間值:epoll_wait調(diào)用后在直到的時(shí)間內(nèi)進(jìn)行阻塞等待,如果被監(jiān)視的文件描述符上一直沒(méi)有事件就緒,則在該時(shí)間后epoll_wait進(jìn)行超時(shí)返回。
返回值說(shuō)明:
- 如果函數(shù)調(diào)用成功,則返回有事件就緒的文件描述符個(gè)數(shù)。
- 如果
timeout
時(shí)間耗盡,則返回0。 - 如果函數(shù)調(diào)用失敗,則返回
-1
,同時(shí)錯(cuò)誤碼會(huì)被設(shè)置。
epoll_wait
調(diào)用失敗時(shí),錯(cuò)誤碼可能被設(shè)置為:
-
EBADF
:傳入的epoll模型對(duì)應(yīng)的文件描述符無(wú)效。 -
EFAULT
:events指向的數(shù)組空間無(wú)法通過(guò)寫(xiě)入權(quán)限訪問(wèn)。 -
EINTR
:此調(diào)用被信號(hào)所中斷。 -
EINVAL
:epfd不是一個(gè)epoll模型對(duì)應(yīng)的文件描述符,或傳入的maxevents值小于等于0。
2、struct epoll_event結(jié)構(gòu)
struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event
結(jié)構(gòu)中有兩個(gè)成員:
- 第一個(gè)成員
events
表示的是需要監(jiān)視的事件。- 當(dāng)我們使用
epoll_ctl
函數(shù)時(shí),通過(guò)這個(gè)字段告訴操作系統(tǒng)我們要關(guān)心那些事件。 - 當(dāng)我們使用
epoll_wait
函數(shù)時(shí),通過(guò)這個(gè)字段操作系統(tǒng)告訴我們關(guān)心的那些事件就緒了。
- 當(dāng)我們使用
- 第二個(gè)成員
data
是一個(gè)聯(lián)合體結(jié)構(gòu),里面記錄的是我們用戶的相關(guān)數(shù)據(jù),當(dāng)我們?cè)?code>epoll_ctrl時(shí)進(jìn)行設(shè)置,在epoll_wait
時(shí)就能夠拿到這個(gè)數(shù)據(jù),一般選擇使用該結(jié)構(gòu)當(dāng)中的fd,這樣在epoll_wait
時(shí)我們通過(guò)第一個(gè)字段能夠拿到發(fā)生了什么事件,通過(guò)第二個(gè)字段能夠拿到了發(fā)生的事件是那個(gè)文件描述符的。
events
的常用取值如下,這些取值實(shí)際也是以宏的方式進(jìn)行定義的,它們的二進(jìn)制序列當(dāng)中有且只有一個(gè)比特位是1,且為1的比特位是各不相同的。
宏名稱 | 宏功能 |
---|---|
EPOLLIN | 表示對(duì)應(yīng)的文件描述符可以讀(包括對(duì)端SOCKET正常關(guān)閉) |
EPOLLOUT | 表示對(duì)應(yīng)的文件描述符可以寫(xiě) |
EPOLLPRI | 表示對(duì)應(yīng)的文件描述符有緊急的數(shù)據(jù)可讀(這里應(yīng)該表示有帶外數(shù)據(jù)到來(lái)) |
EPOLLERR | 表示對(duì)應(yīng)的文件描述符發(fā)送錯(cuò)誤 |
EPOLLHUP | 表示對(duì)應(yīng)的文件描述符被掛斷,即對(duì)端將文件描述符關(guān)閉了 |
EPOLLET | 將epoll的工作方式設(shè)置為邊緣觸發(fā)(Edge Triggered)模式 |
EPOLLONESHOT | 只監(jiān)聽(tīng)一次事件,當(dāng)監(jiān)聽(tīng)完這次事件之后,如果還需要繼續(xù)監(jiān)聽(tīng)該文件描述符的話,需要重新將該文件描述符添加到epoll模型中 |
可以看出epoll
不僅能夠監(jiān)視正常的IO事件,也能夠監(jiān)聽(tīng)對(duì)應(yīng)的文件描述符是否發(fā)生了錯(cuò)誤!
3、epoll的工作原理
1、三大機(jī)制
- 紅黑樹(shù)
- 就緒隊(duì)列
- 回調(diào)機(jī)制
當(dāng)某一進(jìn)程調(diào)用epoll_create
方法時(shí),Linux內(nèi)核會(huì)創(chuàng)建一個(gè)eventpoll
結(jié)構(gòu)體,這個(gè)結(jié)構(gòu)體又會(huì)在內(nèi)核中創(chuàng)建一顆紅黑樹(shù)和一個(gè)就緒隊(duì)列,這個(gè)結(jié)構(gòu)體中有兩個(gè)數(shù)據(jù)結(jié)構(gòu)成員與epoll的使用方式密切相關(guān)。
struct eventpoll
{
// ...
// 紅黑樹(shù)的根節(jié)點(diǎn),這棵樹(shù)中存儲(chǔ)著所有添加到epoll中的需要監(jiān)視的事件
struct rb_root rbr;
// 就緒隊(duì)列中則存放著將要通過(guò)epoll_wait返回給用戶的滿足條件的事件
struct list_head rdlist;
// ...
}
-
每一個(gè)epoll模型都有一個(gè)獨(dú)立的
eventpoll
結(jié)構(gòu)體,用于存放通過(guò)epoll_ctl
方法向epoll模型中添加進(jìn)來(lái)的事件,這些事件都會(huì)被掛載在紅黑樹(shù)中,當(dāng)我們想要增加一個(gè)事件,刪除一個(gè)事件,修改(本質(zhì)是查找)一個(gè)事件時(shí),我們的時(shí)間復(fù)雜度為 O ( l o g n ) O (logn) O(logn),相比于線性遍歷在數(shù)據(jù)量很大時(shí),我們的增刪改的效率依然很高。 -
所有添加到epoll中的事件都會(huì)與設(shè)備(例如:網(wǎng)卡)驅(qū)動(dòng)程序建立回調(diào)關(guān)系,也就是說(shuō):當(dāng)響應(yīng)的事件發(fā)生時(shí)(比如數(shù)據(jù)就緒了),會(huì)調(diào)用這個(gè)回調(diào)方法,這個(gè)回調(diào)方法在內(nèi)核中叫
ep_poll_callback
,它會(huì)將發(fā)生的事件添加到rdlist
雙鏈表(也就是我們的就緒隊(duì)列)中。 -
當(dāng)調(diào)用
epoll_wait
檢查是否有事件發(fā)生時(shí),只需要檢查eventpoll
對(duì)象中的rdlist
雙鏈表中是否有元素即可,這個(gè)操作的時(shí)間復(fù)雜度是 O ( 1 ) O(1) O(1),如果rdlist
不為空,則把發(fā)生的事件復(fù)制到用戶態(tài),同時(shí)將事件數(shù)量返回給用戶。
擴(kuò)展知識(shí)
- 在epoll中,對(duì)于每一個(gè)事件,都會(huì)建立一個(gè)
epitem
結(jié)構(gòu)體,紅黑樹(shù)和就緒隊(duì)列當(dāng)中的節(jié)點(diǎn)分別是基于epitem
結(jié)構(gòu)中的rbn
成員和rdllink
成員的,epitem
結(jié)構(gòu)當(dāng)中的成員ffd
記錄的是指定的文件描述符值,event
成員記錄的就是該文件描述符對(duì)應(yīng)的事件。
struct epitem
{
struct rb_node rbn; //紅黑樹(shù)節(jié)點(diǎn)
struct list_head rdllink; //雙向鏈表節(jié)點(diǎn)
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所屬的eventpoll對(duì)象
struct epoll_event event; //期待發(fā)生的事件類型
}
- 對(duì)于epitem結(jié)構(gòu)當(dāng)中rbn成員來(lái)說(shuō),ffd與event的含義是,需要監(jiān)視ffd上的event事件是否就緒。
- 對(duì)于epitem結(jié)構(gòu)當(dāng)中的rdlink成員來(lái)說(shuō),ffd與event的含義是,ffd上的event事件已經(jīng)就緒了。
2、 一些細(xì)節(jié)補(bǔ)充
- 紅黑樹(shù):
- 紅黑樹(shù)是一種二叉搜索樹(shù),因此必須有鍵值key,而這里的文件描述符就天然的可以作為紅黑樹(shù)的key值。
- 調(diào)用
epoll_ctl
向紅黑樹(shù)當(dāng)中新增節(jié)點(diǎn)時(shí),如果設(shè)置了EPOLLONESHOT
選項(xiàng),當(dāng)監(jiān)聽(tīng)完這次事件后,如果還需要繼續(xù)監(jiān)聽(tīng)該文件描述符則需要重新將其添加到epoll
模型中,本質(zhì)就是當(dāng)設(shè)置了EPOLLONESHOT
選項(xiàng)的事件就緒時(shí),操作系統(tǒng)會(huì)自動(dòng)將其從紅黑樹(shù)當(dāng)中刪除。 - 而如果調(diào)用epoll_ctl向紅黑樹(shù)當(dāng)中新增節(jié)點(diǎn)時(shí)沒(méi)有設(shè)置
EPOLLONESHOT
,那么該節(jié)點(diǎn)插入紅黑樹(shù)后就會(huì)一直存在,除非用戶調(diào)用epoll_ctl
將該節(jié)點(diǎn)從紅黑樹(shù)當(dāng)中刪除。
-
回調(diào)機(jī)制
-
對(duì)于
select
和poll
來(lái)說(shuō),操作系統(tǒng)在監(jiān)視多個(gè)文件描述符上的事件是否就緒時(shí),需要讓操作系統(tǒng)主動(dòng)對(duì)這多個(gè)文件描述符進(jìn)行輪詢檢測(cè),這一定會(huì)增加操作系統(tǒng)的負(fù)擔(dān)。 -
而對(duì)于
epoll
來(lái)說(shuō),操作系統(tǒng)不需要主動(dòng)進(jìn)行事件的檢測(cè),當(dāng)紅黑樹(shù)中監(jiān)視的事件就緒時(shí),驅(qū)動(dòng)程序會(huì)自動(dòng)調(diào)用對(duì)應(yīng)的回調(diào)方法,將紅黑樹(shù)中就緒的節(jié)點(diǎn)直接鏈入就緒隊(duì)列(不需要進(jìn)行拷貝,直接使用指針操作將節(jié)點(diǎn)插入鏈表中)。 -
采用回調(diào)機(jī)制最大的好處就是:不再需要操作系統(tǒng)主動(dòng)對(duì)就緒事件進(jìn)行檢測(cè)了,當(dāng)事件就緒時(shí)會(huì)自動(dòng)調(diào)用對(duì)應(yīng)的回調(diào)函數(shù)進(jìn)行處理。
-
其他的細(xì)節(jié)
-
當(dāng)不斷有監(jiān)視的事件就緒時(shí),會(huì)不斷調(diào)用回調(diào)方法向就緒隊(duì)列當(dāng)中插入節(jié)點(diǎn),而上層也會(huì)不斷調(diào)用
epoll_wait
函數(shù)從就緒隊(duì)列當(dāng)中獲取節(jié)點(diǎn),這是典型的生產(chǎn)者消費(fèi)者模型。 -
由于就緒隊(duì)列可能會(huì)被多個(gè)執(zhí)行流同時(shí)訪問(wèn),因此必須要使用互斥鎖對(duì)其進(jìn)行保護(hù),
eventpoll
結(jié)構(gòu)當(dāng)中的lock
和mtx
就是用于保護(hù)臨界資源的,因此epoll
本身是線程安全的。 -
eventpoll
結(jié)構(gòu)當(dāng)中的wq
(wait queue)就是等待隊(duì)列,當(dāng)多個(gè)執(zhí)行流想要同時(shí)訪問(wèn)同一個(gè)epoll模型時(shí),就需要在該等待隊(duì)列下進(jìn)行等待。 -
epoll_create
函數(shù)在調(diào)用完畢,如果成功了會(huì)返回一個(gè)文件描述符,這是因?yàn)長(zhǎng)inux的設(shè)計(jì)理念是:一切皆文件,Linux將epoll
模型(紅黑樹(shù),就緒隊(duì)列,回調(diào)機(jī)制)也看成一個(gè)文件,將來(lái)我們的epoll
的相關(guān)接口一定是進(jìn)程在進(jìn)行調(diào)用,因此通過(guò)進(jìn)程的task_struct
(PCB)結(jié)構(gòu)一定能夠找到一個(gè)文件管理結(jié)構(gòu)file_struct
,在file_struct
結(jié)構(gòu)中有一個(gè)數(shù)組,通過(guò)這個(gè)數(shù)組的下標(biāo)(本質(zhì)就是文件描述符)我們能夠找到對(duì)應(yīng)的file
結(jié)構(gòu)體,因此通過(guò)這個(gè)file
結(jié)構(gòu)體我們也就能夠找到對(duì)應(yīng)的紅黑樹(shù)和就緒隊(duì)列了于是就可以對(duì)其進(jìn)行增刪改查了!
4、epoll服務(wù)器
初始化
- 首先對(duì)于服務(wù)器,我們需要先進(jìn)行創(chuàng)建,綁定,監(jiān)聽(tīng)套接字,然后使用
epoll_create
創(chuàng)建一個(gè)epoll
模型,然后使用epoll_ctl
將監(jiān)聽(tīng)套接字添加到epoll
模型中。
為了簡(jiǎn)化我們的代碼我們也對(duì)epoll
模型的系統(tǒng)調(diào)用進(jìn)行一下簡(jiǎn)單的封裝,其類名為Epoller
并放到另外一個(gè)頭文件中。
// EpollServer.hpp
#pragma once
#include <iostream>
#include "Epoller.hpp"
#include "Sock.hpp"
#include "log.hpp"
#include "err.h"
//設(shè)置默認(rèn)端口號(hào)
static const uint16_t default_port = 8080;
class EpollServer
{
public:
EpollServer(uint16_t port = default_port)
:_port(port)
{
_listen_fd.Socket();
_listen_fd.Bind(_port);
_listen_fd.Listen();
// 創(chuàng)建的epoll模型
_epoller.Create();
// 添加要關(guān)心的事件
_epoller.AddEvent(_listen_fd.Fd(), EPOLLIN);
}
~EpollServer()
{
_listen_fd.Close();
_epoller.Close();
}
private:
Sock _listen_fd; // 監(jiān)聽(tīng)套接字
uint16_t _port; // 端口號(hào)
Epoller _epoller; // epoll模型
static const size_t _num = 64; // _revs的長(zhǎng)度
struct epoll_event _revs[_num]; // epoll_wait就緒事件的緩沖區(qū)
};
// Epoller.hpp
#pragma once
#include <iostream>
#include <cstdlib>
#include <cstring>
#include <cerrno>
#include <sys/epoll.h>
#include <unistd.h>
#include "log.hpp"
#include "err.h"
// 默認(rèn)無(wú)效的epfd
static const int default_epfd = -1;
class Epoller
{
public:
Epoller(int epfd = default_epfd)
:_epfd(epfd)
{}
void Create()
{
_epfd = epoll_create(1024);
if (_epfd < 0)
{
logMessage(Fatal, "epoll_create fail: %s, errno code %d", strerror(errno), errno);
exit(EPOLL_CREATE_ERR);
}
}
// 添加事件
bool AddEvent(int fd, uint32_t event)
{
struct epoll_event ev;
ev.data.fd = fd;
ev.events = event;
int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);
if (n < 0)
{
logMessage(Warning, "AddEvent's epoll_ctl fail: %s, errno code %d", strerror(errno), errno);
return false;
}
return true;
}
// 移除事件
bool DelEvent(int fd)
{
int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
if (n < 0)
{
logMessage(Warning, "DelEvent's epoll_ctl fail: %s, errno code %d", strerror(errno), errno);
return false;
}
return true;
}
// 等待事件
int Wait(struct epoll_event* revs, int num, int timeout)
{
return epoll_wait(_epfd, revs, num, timeout);
}
void Close()
{
if (_epfd != default_epfd)
{
close(_epfd);
_epfd = default_epfd;
}
}
~Epoller()
{
Close();
}
private:
int _epfd; // epoll模型的fd
};
運(yùn)行服務(wù)器
運(yùn)行epoll服務(wù)器要做的事情和select
,poll
相似,就是不斷調(diào)用epoll_wait
函數(shù)(對(duì)應(yīng)我們封裝的Wait
函數(shù)),從就緒隊(duì)列當(dāng)中獲取就緒事件進(jìn)行處理即可。
class EpollServer
{
public:
EpollServer(uint16_t port = default_port)
:_port(port)
{
_listen_fd.Socket();
_listen_fd.Bind(_port);
_listen_fd.Listen();
_epoller.Create();
bool is_sucess = _epoller.AddEvent(_listen_fd.Fd(), EPOLLIN);
assert(is_sucess);
}
void Start()
{
// 設(shè)置超時(shí)事件為2s,為了演示的方便這里我們使用阻塞的方式進(jìn)行演示
// int timeout = 2000;
while (true)
{
int n = _epoller.Wait(_revs, _num, -1);
switch (n)
{
case -1:
logMessage(Warning, "epoll_wait fail : %s, errno code : %d", strerror(errno), errno);
break;
case 0:
logMessage(Info, "timeout...");
break;
default:
logMessage(Info, "有%d個(gè)事件就緒了!", n);
HandleEvent(n);
break;
}
}
}
void HandleEvent(int n)
{
// 處理事件
}
~EpollServer()
{
_listen_fd.Close();
_epoller.Close();
}
private:
Sock _listen_fd; // 監(jiān)聽(tīng)套接字
uint16_t _port; // 端口號(hào)
Epoller _epoller; // epoll模型
static const size_t _num = 64; // _revs的長(zhǎng)度
struct epoll_event _revs[_num]; // epoll_wait就緒事件的緩沖區(qū)
};
事件處理
如果底層就緒隊(duì)列當(dāng)中有就緒事件,那么調(diào)用epoll_wait
函數(shù)時(shí)就會(huì)將底層就緒隊(duì)列中的事件按照線性拷貝的方式拷貝到用戶提供的_revs
數(shù)組當(dāng)中,即_revs
輸出數(shù)組中拷貝的內(nèi)容,從左向后連續(xù)有效的!接下來(lái)epoll服務(wù)器就應(yīng)該對(duì)就緒事件進(jìn)行處理了,事件處理過(guò)程如下:
-
根據(jù)調(diào)用
epoll_wait
時(shí)得到的返回值,來(lái)判斷操作系統(tǒng)向_revs
數(shù)組中拷貝了多少個(gè)struct epoll_event
結(jié)構(gòu),進(jìn)而對(duì)這些文件描述符上的事件進(jìn)行處理。 -
對(duì)于每一個(gè)拷貝上來(lái)的
struct epoll_event
結(jié)構(gòu),如果該結(jié)構(gòu)當(dāng)中的events
當(dāng)中包含讀事件,則說(shuō)明該文件描述符對(duì)應(yīng)的讀事件就緒,但接下來(lái)還需要進(jìn)一步判斷該文件描述符是監(jiān)聽(tīng)套接字還是與客戶端建立的套接字。 -
如果是監(jiān)聽(tīng)套接字的讀事件就緒,則調(diào)用
accept
函數(shù)將底層建立好的連接獲取上來(lái),并調(diào)用AddEvent
函數(shù)將獲取到的套接字添加到epoll模型當(dāng)中,表示下一次調(diào)用Wait
函數(shù)時(shí)需要監(jiān)視該套接字上的事件。 -
如果是與客戶端建立的連接對(duì)應(yīng)的讀事件就緒,則調(diào)用
recv
函數(shù)讀取客戶端發(fā)來(lái)的數(shù)據(jù),并將讀取到的數(shù)據(jù)在服務(wù)器端進(jìn)行打印并轉(zhuǎn)發(fā)。 -
如果在調(diào)用
recv
函數(shù)時(shí)發(fā)現(xiàn)客戶端將連接關(guān)閉或recv
函數(shù)調(diào)用失敗,則epoll服務(wù)器也直接關(guān)閉對(duì)應(yīng)的連接,并調(diào)用DelEvent
函數(shù)將該連接對(duì)應(yīng)的文件描述符從epoll模型中刪除,表示下一次調(diào)用Wait
函數(shù)時(shí)無(wú)需再監(jiān)視該套接字上的事件。
class EpollServer
{
public:
EpollServer(uint16_t port = default_port)
:_port(port)
{
// ...
}
void Start()
{
// ...
}
void HandleEvent(int n)
{
for (int i = 0; i < n; i++)
{
uint32_t event = _revs[i].events;
int fd = _revs[i].data.fd;
// 處理讀事件
if (event & EPOLLIN)
{
// 1.新的連接事件到來(lái)
if (fd == _listen_fd.Fd())
{
Accepter(fd);
}
else
{
// 2.新的讀取事件到來(lái)
Recver(fd);
}
}
else if (event & EPOLLOUT)
{
// 處理寫(xiě)事件
}
else
{
// 處理其他事件,例如異常事件
}
}
}
void Accepter(int fd)
{
std::string client_ip;
uint16_t client_port;
int sockfd = _listen_fd.Accept(&client_ip, &client_port);
if (sockfd < 0)
{
logMessage(Warning, "accept fail : %s, errno code: %d", strerror(errno), errno);
return;
}
// 將新的連接添加到epoll模型中
_epoller.AddEvent(sockfd, EPOLLIN);
}
void Recver(int fd)
{
char buf[1024];
int n = recv(fd, buf, sizeof(buf), 0);
if (n <= 0)
{
if (errno == EINTR)
{
logMessage(Info, "Recver's recv was interrupted by the signal ... ");
return;
}
else if (n == 0)
{
logMessage(Info, "Recver's recv reached the end of file, fd %d will be remove", fd);
}
else
{
logMessage(Warning, "Recver's recv fail: %s, errno code %d", strerror(errno), errno);
}
// 在處理異常時(shí)要先將fd從epoll模型中移除,然后再關(guān)閉文件描述符
_epoller.DelEvent(fd);
close(fd);
}
else
{
// 處理\r\n
buf[n - 2] = 0;
buf[n - 1] = 0;
std::cout << "client: " << buf << std::endl;
std::string response(buf);
response += "[server echo]\n";
// 這里其實(shí)也應(yīng)該先進(jìn)行檢查寫(xiě)事件是否就緒
send(fd, response.c_str(), response.size(), 0);
}
}
~EpollServer()
{
// ...
}
private:
Sock _listen_fd; // 監(jiān)聽(tīng)套接字
uint16_t _port; // 端口號(hào)
Epoller _epoller; // epoll模型
static const size_t _num = 64; // _revs的長(zhǎng)度
struct epoll_event _revs[_num]; // epoll_wait就緒事件的緩沖區(qū)
};
- 這里有一個(gè)注意點(diǎn):在處理異常時(shí)要先將fd從epoll模型中移除,然后再關(guān)閉文件描述符,不然就有可能fd從epoll模型中移除時(shí)訪問(wèn)已經(jīng)釋放的資源。
epoll服務(wù)器測(cè)試
運(yùn)行epoll服務(wù)器時(shí)需要先實(shí)例化出一個(gè)EpollServer對(duì)象,然后讓epoll服務(wù)器直接調(diào)用進(jìn)行Start
函數(shù)后就可以運(yùn)行服務(wù)器了。
#include <iostream>
#include <memory>
#include "EpollServer.hpp"
int main()
{
std::unique_ptr<EpollServer> up(new EpollServer);
up->Start();
return 0;
}
5、epoll的優(yōu)點(diǎn)
-
接口使用方便:雖然拆分成了三個(gè)函數(shù),但是反而使用起來(lái)更方便高效。
-
數(shù)據(jù)拷貝輕量:只在新增監(jiān)視事件的時(shí)候調(diào)用
epoll_ctl
將數(shù)據(jù)從用戶拷貝到內(nèi)核,而select
和poll
每次都需要重新將需要監(jiān)視的事件從用戶拷貝到內(nèi)核。此外,調(diào)用epoll_wait
獲取就緒事件時(shí),只會(huì)拷貝就緒的事件,不會(huì)進(jìn)行不必要的拷貝操作。 -
優(yōu)秀的回調(diào)機(jī)制:避免操作系統(tǒng)主動(dòng)輪詢檢測(cè)事件就緒,而是采用回調(diào)函數(shù)的方式,將就緒的文件描述符結(jié)構(gòu)加入到就緒隊(duì)列中。
-
沒(méi)有數(shù)量限制:監(jiān)視的文件描述符數(shù)目無(wú)上限,只要內(nèi)存允許,就可以一直向紅黑樹(shù)當(dāng)中新增節(jié)點(diǎn)。
一個(gè)注意點(diǎn):
有人說(shuō)epoll中使用了內(nèi)存映射機(jī)制,內(nèi)核可以直接將底層就緒隊(duì)列通過(guò)mmap的方式映射到用戶態(tài),此時(shí)用戶就可以直接讀取到內(nèi)核中就緒隊(duì)列當(dāng)中的數(shù)據(jù),避免了內(nèi)存拷貝的額外性能開(kāi)銷。
-
這種說(shuō)法是錯(cuò)誤的,實(shí)際操作系統(tǒng)并沒(méi)有做任何映射機(jī)制,因?yàn)椴僮飨到y(tǒng)是不相信任何人的,操作系統(tǒng)不會(huì)讓用戶進(jìn)程直接訪問(wèn)到內(nèi)核的數(shù)據(jù)的,因此用戶要獲取內(nèi)核當(dāng)中的數(shù)據(jù),勢(shì)必還是需要將內(nèi)核的數(shù)據(jù)拷貝到用戶空間。
-
在
epoll
的內(nèi)核源碼中,epoll_wait
實(shí)現(xiàn)的內(nèi)核代碼中調(diào)用了__put_user
函數(shù),這個(gè)函數(shù)的作用就是就是將數(shù)據(jù)從內(nèi)核拷貝到用戶空間。
epoll與select和poll的不同之處
-
在使用
select
和poll
時(shí),都需要借助第三方數(shù)組來(lái)維護(hù)歷史上的文件描述符以及需要監(jiān)視的事件,這個(gè)第三方數(shù)組是由用戶自己維護(hù)的,對(duì)該數(shù)組的增刪改操作都需要用戶自己來(lái)進(jìn)行。 -
使用epoll時(shí),不需要用戶自己維護(hù)所謂的第三方數(shù)組,epoll底層的紅黑樹(shù)就充當(dāng)了這個(gè)第三方數(shù)組的功能,并且該紅黑樹(shù)的增刪改操作都是由內(nèi)核維護(hù)的,用戶只需要調(diào)用
epoll_ctl
讓內(nèi)核對(duì)該紅黑樹(shù)進(jìn)行對(duì)應(yīng)的操作即可。 -
在使用多路轉(zhuǎn)接接口時(shí),數(shù)據(jù)流都有兩個(gè)方向,一個(gè)是用戶告知內(nèi)核,一個(gè)是內(nèi)核告知用戶。select和poll將這兩件事情都交給了同一個(gè)函數(shù)來(lái)完成,而epoll在接口層面上就將這兩件事進(jìn)行了分離,epoll通過(guò)調(diào)用
epoll_ctl
完成用戶告知內(nèi)核,通過(guò)調(diào)用epoll_wait
完成內(nèi)核告知用戶。
6、epoll工作方式
epoll有兩種工作方式,分別是水平觸發(fā)工作模式和邊緣觸發(fā)工作模式。
- 水平觸發(fā)(LT,Level Triggered)
只要底層有事件就緒的事件,或者就緒的事件沒(méi)有被處理完全,epoll就會(huì)一直通知用戶,就像數(shù)字電路當(dāng)中的高電平觸發(fā)一樣,只要一直處于高電平狀態(tài),則會(huì)一直觸發(fā),直到我們的電平狀態(tài)變?yōu)榱说碗娖健?/p>
epoll默認(rèn)狀態(tài)下就是LT工作模式,select
和poll
其實(shí)默認(rèn)狀態(tài)下也是LT工作模式。
- 由于在LT工作模式下,只要底層有事件就緒就會(huì)一直通知用戶,因此當(dāng)epoll檢測(cè)到底層讀事件就緒時(shí),可以不立即進(jìn)行處理,或者只處理一部分,因?yàn)橹灰讓訑?shù)據(jù)沒(méi)有處理完,下一次epoll還會(huì)通知用戶事件就緒。
實(shí)驗(yàn):
在前面我們寫(xiě)的代碼中,我們只需要EpollServer.hpp
中的EpollServer
中的Start
函數(shù)中的HandleEvent
函數(shù)調(diào)用給注釋掉,即有事件到來(lái)了,我們不去處理,當(dāng)我們循環(huán)再次調(diào)用epoll_wait
時(shí),由于就緒的事件沒(méi)有處理,就緒隊(duì)列中對(duì)應(yīng)的節(jié)點(diǎn)沒(méi)有被清除,所以下面我們使用的是阻塞調(diào)用就形同虛設(shè)了(因?yàn)榫途w隊(duì)列一直有沒(méi)有被處理的數(shù)據(jù))。
然后運(yùn)行我們修改過(guò)的代碼,我們應(yīng)該看到當(dāng)我們連接服務(wù)器時(shí),有一個(gè)讀事件發(fā)生,我們不去處理,服務(wù)器一直在給我們通知。
void Start()
{
// int timeout = 2000;
while (true)
{
int n = _epoller.Wait(_revs, _num, -1);
switch (n)
{
case -1:
logMessage(Warning, "epoll_wait fail : %s, errno code : %d", strerror(errno), errno);
break;
case 0:
logMessage(Info, "timeout...");
break;
default:
logMessage(Info, "有%d個(gè)事件就緒了!", n);
//HandleEvent(n);
break;
}
}
}
通過(guò)這個(gè)現(xiàn)象我們能看出epoll
的默認(rèn)的LT工作模式。
- 邊緣觸發(fā)(ET,Edge Triggered)
只有底層就緒事件數(shù)量由無(wú)到有或由有到多發(fā)生變化的時(shí)候,epoll
才會(huì)通知用戶,就像數(shù)字電路當(dāng)中的上升沿觸發(fā)一樣,只有當(dāng)電平由低變高的那一瞬間才會(huì)觸發(fā)。
如果要將epoll改為ET工作模式,則需要在添加事件時(shí)設(shè)置EPOLLET
選項(xiàng)。
-
由于在ET工作模式下,只有底層就緒事件無(wú)到有或由有到多發(fā)生變化的時(shí)候才會(huì)通知用戶,因此當(dāng)
epoll
檢測(cè)到底層讀事件就緒時(shí),必須立即進(jìn)行處理,而且必須全部處理完畢,因?yàn)橛锌赡艽撕蟮讓釉僖矝](méi)有事件就緒,那么epoll就再也不會(huì)通知用戶進(jìn)行事件處理,此時(shí)沒(méi)有處理完的數(shù)據(jù)就相當(dāng)于丟失了。 -
ET工作模式下epoll通知用戶的次數(shù)一般比LT少,因此ET的性能一般比LT性能更高,Nginx就是默認(rèn)采用ET模式使用epoll的。
實(shí)驗(yàn):
和剛才的實(shí)驗(yàn)同理,這次依然我們需要EpollServer.hpp
中的EpollServer
中的Start
函數(shù)中的HandleEvent
函數(shù)調(diào)用給注釋掉,同時(shí)將EpollServer
的構(gòu)造函數(shù)中的AddEvent
中的事件添加| EPOLLET
,有事件到來(lái)了,我們依然不進(jìn)行處理,運(yùn)行代碼我們看到epoll
確實(shí)只給我們通知了一次!
EpollServer(uint16_t port = default_port)
:_port(port)
{
_listen_fd.Socket();
_listen_fd.Bind(_port);
_listen_fd.Listen();
// 創(chuàng)建的epoll模型
_epoller.Create();
// 添加要關(guān)心的事件
_epoller.AddEvent(_listen_fd.Fd(), EPOLLIN | EPOLLET);
}
LT和ET工作模式的對(duì)比
-
通知效率:在ET模式下,一個(gè)文件描述符就緒之后,用戶不會(huì)反復(fù)收到通知,一般來(lái)說(shuō)ET比LT的通知效率更高效。
一次通知就是一次系統(tǒng)調(diào)用返回,一次返回必定對(duì)應(yīng)一次調(diào)用,相比于LT模式ET有效減少系統(tǒng)調(diào)用次數(shù),所以ET的這個(gè)特點(diǎn)能夠提升ET模式的工作效率。 -
數(shù)據(jù)讀寫(xiě)的方式
- 對(duì)于ET模式,由無(wú)到有或由有到多發(fā)生變化,才會(huì)通知上層讀取數(shù)據(jù),如果本次通知的數(shù)據(jù)上層不全部讀取完畢,此時(shí)沒(méi)有處理完的數(shù)據(jù)就有可能讀取不到了,這就倒逼程序員必須一次將本輪數(shù)據(jù)全部讀取完畢。
于是就會(huì)產(chǎn)生下面的邏輯鏈:
所以在ET模式下所有的讀取和寫(xiě)入都必須是非阻塞的接口!
ET倒逼程序員盡快取走所有的數(shù)據(jù),本質(zhì)是: 讓TCP底層更新出更大的接受窗口,從而在較大概率上提供對(duì)方的滑動(dòng)窗口的大小,提高發(fā)送效率! - 在LT模式下,由于當(dāng)我們數(shù)據(jù)沒(méi)有讀取完畢時(shí),
epoll
會(huì)給我們進(jìn)行通知,所以我們既可以采用一次讀取完畢,或者一次讀取一些,在LT模式下我們也能夠使用非阻塞接口進(jìn)行讀取而且我們還能使用阻塞接口進(jìn)行讀取,因?yàn)楫?dāng)epoll
給我們通知時(shí),說(shuō)明數(shù)據(jù)肯定已經(jīng)就緒了!當(dāng)然為了IO的效率我們一般選擇一次讀取完畢。
- 對(duì)于ET模式,由無(wú)到有或由有到多發(fā)生變化,才會(huì)通知上層讀取數(shù)據(jù),如果本次通知的數(shù)據(jù)上層不全部讀取完畢,此時(shí)沒(méi)有處理完的數(shù)據(jù)就有可能讀取不到了,這就倒逼程序員必須一次將本輪數(shù)據(jù)全部讀取完畢。
所以我們的LT模式在使用非阻塞接口并且選擇一次讀取完畢時(shí),其IO的效率和ET模式是一樣的,所以論效率的上限是 ET = LT
的,但是LT模式還能夠使用阻塞式接口進(jìn)行讀取,是不是ET模式就沒(méi)有必要存在了呢?
答案是:不是,在服務(wù)器中我們一般使用的還是ET模式。
- 首先:是因?yàn)镋T的工作效率是比較高的!
- 其次:ET是必須使用非阻塞接口的,如果中間沒(méi)有使用非阻塞接口很容易將問(wèn)題的bug暴漏出來(lái)進(jìn)行測(cè)試修正,而LT如果中間使用了非阻塞接口不容易將問(wèn)題暴漏出來(lái),就不容易進(jìn)行修正。
當(dāng)然LT也有它的一些適用場(chǎng)景,例如在一個(gè)IO量很大又需要實(shí)時(shí)性很高的場(chǎng)景,我們就可以選擇LT進(jìn)行多次IO,邊讀取邊分析邊響應(yīng),如果是ET模式就不能夠做到及時(shí)的響應(yīng)了。
ET工作模式下應(yīng)該如何進(jìn)行讀寫(xiě)
因?yàn)樵贓T工作模式下,只有底層就緒事件無(wú)到有或由有到多發(fā)生變化的時(shí)候才會(huì)通知用戶,這就倒逼用戶當(dāng)讀事件就緒時(shí)必須一次性將數(shù)據(jù)全部讀取完畢,當(dāng)寫(xiě)事件就緒時(shí)必須一次性將發(fā)送緩沖區(qū)寫(xiě)滿,否則可能再也沒(méi)有機(jī)會(huì)進(jìn)行讀寫(xiě)了。
-
因此讀數(shù)據(jù)時(shí)必須循環(huán)調(diào)用recv函數(shù)進(jìn)行讀取,寫(xiě)數(shù)據(jù)時(shí)必須循環(huán)調(diào)用send函數(shù)進(jìn)行寫(xiě)入。
-
當(dāng)?shù)讓幼x事件就緒時(shí),循環(huán)調(diào)用recv函數(shù)進(jìn)行讀取,直到某次調(diào)用recv讀取時(shí),實(shí)際讀取到的字節(jié)數(shù)小于期望讀取的字節(jié)數(shù),則說(shuō)明本次底層數(shù)據(jù)已經(jīng)讀取完畢了。
-
但有可能最后一次調(diào)用recv讀取時(shí),剛好實(shí)際讀取的字節(jié)數(shù)和期望讀取的字節(jié)數(shù)相等,但此時(shí)底層數(shù)據(jù)也恰好讀取完畢了,如果我們?cè)僬{(diào)用recv函數(shù)進(jìn)行讀取,那么recv就會(huì)因?yàn)榈讓記](méi)有數(shù)據(jù)而被阻塞住。
-
而這里的阻塞是非常嚴(yán)重的,就比如我們這里寫(xiě)的服務(wù)器都是單進(jìn)程的服務(wù)器,如果recv被阻塞住,并且此后該數(shù)據(jù)再也不就緒,那么就相當(dāng)于我們的服務(wù)器掛掉了,因此在ET工作模式下循環(huán)調(diào)用recv函數(shù)進(jìn)行讀取時(shí),必須將對(duì)應(yīng)的文件描述符設(shè)置為非阻塞狀態(tài)。
-
調(diào)用send函數(shù)寫(xiě)數(shù)據(jù)時(shí)也是同樣的道理,需要循環(huán)調(diào)用send函數(shù)進(jìn)行數(shù)據(jù)的寫(xiě)入,并且必須將對(duì)應(yīng)的文件描述符設(shè)置為非阻塞狀態(tài)。
強(qiáng)調(diào): ET工作模式下,recv
和send
操作的文件描述符必須設(shè)置為非阻塞狀態(tài),這是必須的,不是可選的!文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-756782.html
參考資料:
IO多路轉(zhuǎn)接 ——— select、poll、epoll文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-756782.html
到了這里,關(guān)于【Linux】I/O多路轉(zhuǎn)接技術(shù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!