目錄
一、poll?
二、epoll
1.epoll
2.epoll的函數(shù)接口
①epoll_create
②epoll_ctl
③epoll_wait
3.操作原理
三、epoll服務(wù)器編寫
1.日志打印
2.TCP服務(wù)器
3.Epoll
①雛形
②InitEpollServer 與 RunServer
③HandlerEvent
四、Epoll的工作模式
1.LT模式與ET模式
2.基于LT模式的epoll服務(wù)器
①整體框架
②處理BUG
③優(yōu)化結(jié)構(gòu)? ? ? ??????
④異常處理
④序列化與反序列化
⑤優(yōu)化
⑥Reactor模式與Proactor模式
一、poll
? ? ? ? 該函數(shù)的作用也是如同select一樣在IO中負(fù)責(zé)等,但是它不用每次將參數(shù)重設(shè),并且沒有上限。
? ? ? ? 這需要參數(shù)的幫助。
? ? ? ? 第一個(gè)參數(shù)的類型是一個(gè)指針,其中存放的類型是一個(gè)結(jié)構(gòu)體。
????????fd為文件描述符,events是負(fù)責(zé)告訴內(nèi)核關(guān)注什么事件,revents是負(fù)責(zé)告訴用戶關(guān)注的事件是否就緒。替代了select中輸入輸出參數(shù)為同一個(gè)的問(wèn)題。我們還可以將參數(shù)稍作調(diào)整更為直觀。
int poll(struct pollfd fds[],nfds_t nfds,int timeout);??
? ? ? ? 我們要關(guān)注讀事件寫事件怎么傳呢?
? ? ? ? 這些都是宏,我們想關(guān)注多個(gè)事件可以將他們對(duì)應(yīng)的宏按位與在一起,再傳入。??
? ? ? ? 第二個(gè)參數(shù)的類型是nfds_t,實(shí)則是long int。
? ? ? ? 該參數(shù)傳遞的是前一個(gè)參數(shù)作為數(shù)組中的元素個(gè)數(shù)。
? ? ? ? 第三個(gè)參數(shù),與select中的timeout不同的是 ,這里的timeout的參數(shù)不是結(jié)構(gòu)體,直接表示的是微秒。
? ? ? ? 關(guān)于poll的服務(wù)器編寫就不多闡述了,我們將上篇文章的select稍微修改下,就能使用。大家可以移步先去觀看上篇文章。
代碼:
#include <iostream>
#include <poll.h>
#include "Sock.hpp"
using namespace std;
#define NUM 1024
struct pollfd fdsArray[NUM]; // 輔助數(shù)組 里面存放歷史文件描述符
#define DEFAUIT -1 // 默認(rèn)
void Usage(string process)
{
cout << "Please entry" << process << " port" << endl;
}
static void ShowArray()
{
cout << "當(dāng)前的文件描述符為: ";
for (int i = 0; i < NUM; i++)
{
if (fdsArray[i].fd == DEFAUIT)
continue;
cout << fdsArray[i].fd << ' ';
}
cout << endl;
}
static void HandlerEvent(int listensock)
{
for (int j = 0; j < NUM; j++)
{
if (fdsArray[j].fd == DEFAUIT)
continue;
if (j == 0 && fdsArray[j].fd == listensock)
{
if (fdsArray[j].revents & POLLIN)
{
cout << "新連接到來(lái),需要處理" << endl;
string clientip;
uint16_t clientport = 0;
int sock = Sock::Accept(listensock, &clientip, &clientport); // 這里不會(huì)阻塞
if (sock < 0)
return;
cout << "獲取新連接成功 " << clientip << ":" << clientport << " Sock:" << sock << endl;
int i = 0;
for (; i < NUM; i++)
{
if (fdsArray[i].fd == DEFAUIT)
break;
}
if (i == NUM)
{
cerr << "服務(wù)器已經(jīng)到達(dá)了上限" << endl;
close(sock);
}
else
{
// 將文件描述符放入fdsArray中
fdsArray[i].fd = sock;
fdsArray[i].events = POLLIN;
fdsArray[i].revents = 0;
// debug
ShowArray();
}
}
}
else
{
// 處理其他的文件描述符的IO事件
if (fdsArray[j].revents & POLLIN)
{
char buffer[1024];
ssize_t s = recv(fdsArray[j].fd, buffer, sizeof(buffer), 0);
// 這里的阻塞讀取真的會(huì)阻塞住嗎?并不會(huì),因?yàn)樽叩竭@里select已經(jīng)幫我們等了,并且此時(shí)事件就緒。
if (s > 0)
{
buffer[s] = 0;
cout << "client[" << fdsArray[j].fd << "]"
<< " # " << buffer << endl;
}
else if (s == 0)
{
cout << "client[" << fdsArray[j].fd << "] "
<< "quit"
<< " server will close " << fdsArray[j].fd << endl;
fdsArray[j].fd = DEFAUIT; // 恢復(fù)默認(rèn)
fdsArray[j].events = 0;
fdsArray[j].revents = 0;
close(fdsArray[j].fd); // 關(guān)閉sock
ShowArray(); // debug
}
else
{
cerr << "recv error" << endl;
fdsArray[j].fd = DEFAUIT; // 恢復(fù)默認(rèn)
fdsArray[j].events = 0;
fdsArray[j].revents = 0;
close(fdsArray[j].fd); // 關(guān)閉sock
ShowArray(); // debug
}
}
}
}
}
int main(int argc, char **argv)
{
if (argc != 2)
{
Usage(argv[0]);
exit(1);
}
int listensocket = Sock::Socket();
Sock::Bind(listensocket, atoi(argv[1]));
Sock::Listen(listensocket);
for (int i = 0; i < NUM; i++)
{
fdsArray[i].fd = DEFAUIT;
fdsArray[i].events = 0;
fdsArray[i].revents = 0;
}
fdsArray[0].fd = listensocket; // 默認(rèn)fdsArray第一個(gè)元素存放
fdsArray[0].events = POLLIN;
int timeout = 100000;
while (1)
{
int n = poll(fdsArray, NUM, timeout);
switch (n)
{
case 0:
// timeout
cout << "timeout ... : " << (unsigned int)time(nullptr) << endl;
break;
// error
cout << "select error : " << strerror(errno) << endl;
case -1:
break;
default:
// 等待成功
HandlerEvent(listensocket);
break;
}
}
}
現(xiàn)象:
二、epoll
1.epoll
? ? ? ? 按照man手冊(cè)中說(shuō)法為:是為處理大批量句柄而做了改進(jìn)的poll。
? ? ? ? epoll被公認(rèn)為性能最好的多路I/O就緒通知方法。
?2.epoll的函數(shù)接口
①epoll_create
? ? ? ? 該函數(shù)的目的是創(chuàng)建一個(gè)epoll句柄。
????????自從linux2.6.8之后,size參數(shù)是被忽略的。
? ? ? ? 返回值:成功時(shí)返回一個(gè)文件描述符,失敗時(shí)返回-1。
②epoll_ctl
? ? ? ? 該函數(shù)的作用是為對(duì)epoll句柄中添加特點(diǎn)的文件描述符對(duì)應(yīng)的事件。也就是用戶告訴內(nèi)核要關(guān)注哪些事件。
? ? ? ? epfd:傳入epoll_create的返回值。
? ? ? ? op:選項(xiàng),有EPOLL_CTL_ADD、?EPOLL_CTL_MOD、EPOLL_CTL_DEL。
? ? ? ? fd:文件描述符。
? ? ? ? event:對(duì)應(yīng)的事件。
? ? ? ? 其中的結(jié)構(gòu)體為:
③epoll_wait
? ? ? ? 作用是等待文件描述符對(duì)應(yīng)的事件是否就緒。
? ? ? ? epfd:傳入epoll_create的返回值。
????????events:對(duì)應(yīng)的事件。
? ? ? ? maxevents:當(dāng)前關(guān)注的文件描述符的最大值。
? ? ? ? timeout:等待時(shí)間。
3.操作原理
? ? ? ? 了解了上文這么多函數(shù),其實(shí)僅僅是看并沒有真正理解到epoll是怎么工作的,下面來(lái)講講操作原理。
? ? ? ? 操作系統(tǒng)如何得知網(wǎng)絡(luò)中的數(shù)據(jù)到來(lái)了?
? ? ? ? 網(wǎng)卡中得到數(shù)據(jù),會(huì)向CPU發(fā)送硬件中斷,調(diào)用OS預(yù)設(shè)的中斷函數(shù),負(fù)責(zé)從外設(shè)進(jìn)行數(shù)據(jù)拷貝,從外設(shè)拷貝內(nèi)核緩沖區(qū)。
? ? ? ? epoll_create創(chuàng)建的epoll句柄是什么?
? ? ? ? epoll句柄可以理解為epoll模型。
? ? ? ? epoll_create會(huì)創(chuàng)建一個(gè)空的紅黑樹,一個(gè)就緒隊(duì)列,創(chuàng)建對(duì)應(yīng)的回調(diào)函數(shù)。
? ? ? ? 這里的紅黑樹中的節(jié)點(diǎn)存放著要關(guān)注的文件描述符和事件等信息,屬于用戶告訴內(nèi)核。這里的樹等價(jià)于當(dāng)初寫poll、select維護(hù)的數(shù)組。
? ? ? ? ?就緒隊(duì)列存放著已經(jīng)就緒的事件。屬于內(nèi)核告訴用戶。? ? ? ? 回調(diào)函數(shù)是在當(dāng)數(shù)據(jù)到來(lái)發(fā)生硬件中斷,os調(diào)用中斷函數(shù)中拷貝之后使用。
? ? ? ? ?該回調(diào)函數(shù)會(huì)依據(jù)就緒的數(shù)據(jù),獲取到對(duì)應(yīng)的文件描述符和對(duì)應(yīng)的事件,依據(jù)這兩個(gè)內(nèi)容構(gòu)建一個(gè)fd_queue節(jié)點(diǎn),插入到就緒隊(duì)列中。
? ? ? ? 這三個(gè)合起來(lái)為epoll模型。
? ? ? ? 這個(gè)epoll模型是調(diào)用epoll_create會(huì)創(chuàng)建的,但是為什么返回值是一個(gè)文件描述符呢?
? ? ? ? 我們知道文件描述符其實(shí)就是數(shù)組的下標(biāo),該數(shù)組存放著指向的管理文件的結(jié)構(gòu)體的指針。具體的大家可以看我這篇文章。
? ? ? ? struct file中就存放著epoll的數(shù)據(jù)結(jié)構(gòu)。
? ? ? ? 由文件描述符找到文件的結(jié)構(gòu)體,就可以找到紅黑樹,以及就緒隊(duì)列等關(guān)于epoll的數(shù)據(jù)。? ? ? ??
????????epoll_ctl則是來(lái)維護(hù)這個(gè)紅黑樹的,負(fù)責(zé)增加節(jié)點(diǎn)刪除節(jié)點(diǎn),也就是維護(hù)用戶告訴內(nèi)核信息的函數(shù)。
? ? ? ? epoll_wait則是來(lái)從內(nèi)核中的就緒隊(duì)列拿數(shù)據(jù),有數(shù)據(jù)則證明有事件就緒了,以前poll需要便利數(shù)組去查看是否有文件描述符對(duì)應(yīng)的事件就緒,現(xiàn)在只用通過(guò)檢查就緒隊(duì)列是否有數(shù)據(jù)就知道是否有文件描述符對(duì)應(yīng)的事件就緒。時(shí)間復(fù)雜度為O(1)。
? ? ? ?
三、epoll服務(wù)器編寫
1.日志打印
Log.hpp:
#include <cstdio>
#include <cstdarg>
#include <cassert>
#include <stdlib.h>
#include <time.h>
#define DEBUG 0
#define NOTICE 1
#define WARNING 2
#define FATAL 3
const char *log_level[] = {"DEBUG", "NOTICE", "WARNING", "FATAL"};
void logMessage(int level, const char *format, ...)
{
assert(level >= DEBUG);
assert(level <= FATAL);
char logInfor[1024];
char *name = getenv("USER");
va_list ap;
va_start(ap, format);
vsnprintf(logInfor, sizeof(logInfor) - 1, format, ap);
va_end(ap);
FILE * out = (level == FATAL) ? stderr : stdout;
fprintf(out,"%s | %u | %s | %s\n",\
log_level[level],\
(unsigned int)time(nullptr),\
name == nullptr ? "Unkown" : name,\
logInfor
);
}
2.TCP服務(wù)器
Sock.hpp:
#pragma once
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <cstdio>
#include <cstring>
#include <signal.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>
#include <cerrno>
#include <cassert>
class Sock
{
public:
static const int gbacklog = 20;
static int Socket()
{
int listenSock = socket(PF_INET, SOCK_STREAM, 0);
if (listenSock < 0)
{
exit(1);
}
int opt = 1;
setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
return listenSock;
}
static void Bind(int socket, uint16_t port)
{
struct sockaddr_in local; // 用戶棧
memset(&local, 0, sizeof local);
local.sin_family = PF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
// 2.2 本地socket信息,寫入sock_對(duì)應(yīng)的內(nèi)核區(qū)域
if (bind(socket, (const struct sockaddr *)&local, sizeof local) < 0)
{
exit(2);
}
}
static void Listen(int socket)
{
if (listen(socket, gbacklog) < 0)
{
exit(3);
}
}
static int Accept(int socket, std::string *clientip, uint16_t *clientport)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int serviceSock = accept(socket, (struct sockaddr *)&peer, &len);
if (serviceSock < 0)
{
// 獲取鏈接失敗
return -1;
}
if (clientport)
*clientport = ntohs(peer.sin_port);
if (clientip)
*clientip = inet_ntoa(peer.sin_addr);
return serviceSock;
}
};
3.Epoll
①雛形
EpollServer.hpp:
#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include "log.hpp"
using namespace std;
class EpollServer
{
public:
EpollServer(uint16_t port)
: port_(port), listensock_(-1), epfd_(-1)
{
}
void InitEpollServer()
{
}
void RunServer()
{
}
~EpollServer()
{
if(listensock_ != -1) close(listensock_);
if(epfd_ != -1) close(epfd_);
}
private:
int listensock_;
int epfd_;
uint16_t port_;
};
epoll.cc:
#include "EpollServer.hpp"
#include "Sock.hpp"
#include "Log.hpp"
#include <memory>
void Usage(string process)
{
cout << "Please entry" << process << " port" << endl;
}
int main(int argv, char **argc)
{
if (argv != 2)
{
Usage(argc[0]);
exit(1);
}
unique_ptr<EpollServer> epoll(new EpollServer(atoi(argc[1])));
epoll->InitEpollServer();
epoll->RunServer();
return 0;
}
②InitEpollServer 與 RunServer
代碼:
void InitEpollServer()
{
listensock_ = Sock::Socket();
Sock::Bind(listensock_, port_);
Sock::Listen(listensock_);
epfd_ = epoll_create(gsize);
if (epfd_ < 0)
{
logMessage(FATAL, "%d:%s", errno, strerror(errno));
exit(1);
}
logMessage(DEBUG, "epoll_creatr success,epoll模型創(chuàng)建成功,epfd: %d", epfd_);
}
void RunServer()
{
// 1.先添加listensock_到epoll模型中
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listensock_;
int a = epoll_ctl(epfd_, EPOLL_CTL_ADD, listensock_, &ev);
if (a != 0)
{
logMessage(FATAL, "%d:%s", errno, strerror(errno));
exit(1);
}
struct epoll_event revs[num];
int timeout = 10000;
while (1)
{
int n = epoll_wait(epfd_, revs, num, timeout);
switch (n)
{
case 0:
// timeout
cout << "timeout ... : " << (unsigned int)time(nullptr) << endl;
break;
// error
cout << "epoll_wait error : " << strerror(errno) << endl;
case -1:
break;
default:
// 等待成功
cout<<"event 到來(lái)"<<endl;
break;
}
}
}
現(xiàn)象:
③HandlerEvent
? ? ? ? 下面開始處理就緒事件。首先,我們定義一個(gè)成員變量它的類型是function<int(int)>,并初始化,
? ? ? ? ?我們將自己想要對(duì)文件描述符處理的函數(shù)傳入在類的實(shí)例化的過(guò)程中,并在HandlerEvent函數(shù)中調(diào)用該函數(shù)。
int myfuc(int sock)
{
// 這里bug,TCP基于流式,如何保證一次將數(shù)據(jù)讀取完畢,一會(huì)解決
char buff[1024];
int sz = recv(sock, buff, sizeof buff -1, 0);
if (sz > 0)
{
buff[sz] = 0;
logMessage(DEBUG, "client[%d]:%s", sock, buff);
return sz;
}
return sz;
}
int main(int argv, char **argc)
{
if (argv != 2)
{
Usage(argc[0]);
exit(1);
}
unique_ptr<EpollServer> epoll(new EpollServer(atoi(argc[1]), myfuc));
epoll->InitEpollServer();
epoll->RunServer();
return 0;
}
? ? ? ??
void HandlerEvent(struct epoll_event *revs, int n)
{
for (int i = 0; i < n; i++)
{
int sock = revs[i].data.fd;
uint32_t revent = revs[i].events;
if (revent & EPOLLIN)
{
// IO
if (sock == listensock_)
{
// 監(jiān)聽套接字
string clientip;
uint16_t clientport = 0;
int iosock = Sock::Accept(listensock_, &clientip, &clientport);
if (iosock < 0)
{
logMessage(FATAL, "Sock error , errno : %d :%s", errno, strerror(errno));
continue;
}
// 托管給epoll
struct epoll_event ev;
ev.data.fd = iosock;
ev.events = EPOLLIN;
int a = epoll_ctl(epfd_, EPOLL_CTL_ADD, iosock, &ev);
assert(a == 0);
(void)a;
}
else
{
// 其他套接字
int n = fuc_(sock);
if (n == 0 || n < 0)
{
int x = epoll_ctl(epfd_, EPOLL_CTL_DEL, sock, nullptr);
assert(x == 0);
(void)x;
close(sock);
logMessage(DEBUG,"clinet[%d] exit",sock);
}
}
}
else
{
// epollout 后續(xù)處理
}
}
}
?現(xiàn)象:
四、Epoll的工作模式
1.LT模式與ET模式
? ? ? ? LT(level trigger)水平觸發(fā),ET(edge trigger)邊緣觸發(fā)。
????????LT模式通俗來(lái)講,只要底層有數(shù)據(jù),就一直通知上層。
????????ET模式,只要底層有數(shù)據(jù)且是從無(wú)到有,從有到多,發(fā)生變化時(shí)才會(huì)通知上層。
? ? ? ? 例如:你對(duì)媽媽說(shuō)今天午飯LT模式,當(dāng)媽媽將飯做好,等你吃飯時(shí),并且一直在喊你,寶貝兒子快來(lái)吃飯,直到你去吃飯。這次你對(duì)媽媽說(shuō)今天午飯ET模式,當(dāng)媽媽將飯做好,飯從無(wú)到有,此時(shí)媽媽叫你吃飯,你沒有去,此后飯的量恒定沒有變化,媽媽就再也不會(huì)通知你。
? ? ? ? 這兩個(gè)模式哪一個(gè)更為高效呢?ET模式更為高效。以TCP通信來(lái)講,并沒有多少數(shù)據(jù)的到來(lái),底層卻一直在提醒上層,多數(shù)的提醒的都是在提醒同一個(gè)數(shù)據(jù)就緒。
? ? ? ? LT模式,只要有數(shù)據(jù)就會(huì)提醒,我們可以先去處理其他業(yè)務(wù),等某次提醒時(shí)再處理底層的數(shù)據(jù)。但ET模式,數(shù)據(jù)到來(lái)之后沒有變化,就再也不會(huì)提醒你,所以只能在提醒時(shí)就處理數(shù)據(jù),要不后面再也沒有提醒,這就倒逼程序員一次將數(shù)據(jù)讀完。
? ? ? ? 如何知道底層數(shù)據(jù)已經(jīng)讀完了?只有不斷的去調(diào)用recv、read函數(shù),如果報(bào)錯(cuò)了證明數(shù)據(jù)已經(jīng)讀完了。數(shù)據(jù)已經(jīng)讀完了,但最后一次,沒數(shù)據(jù)了,read卻阻塞住了。
? ? ? ? 所以在ET模式下,所有的文件描述符都要設(shè)置成非阻塞。
? ? ? ? epoll默認(rèn)就是LT模式。
2.基于LT模式的epoll服務(wù)器
①整體框架
? ? ? ? 整體代碼已經(jīng)上傳到gitee上,配合整體代碼觀看,更加直觀便捷。
? ? ? ? 下面我們基于上文的epoll服務(wù)器,但不同與上文,大家請(qǐng)往下看。????????
? ? ? ? 首先先建立一個(gè)類,該類將文件描述符和回調(diào)方法結(jié)合。
Tcpserver.hpp:
class Connection;
using fuc_t = function<int(Connection *)>;
class Connection
{
public:
// 文件描述符
int _sock;
Tcpserver *_ptr;
// 自己的接受和發(fā)送緩沖區(qū)
string _inbuff;
string _outbuff;
// 回調(diào)函數(shù)
fuc_t _readfuc;
fuc_t _writefuc;
fuc_t _exceptfuc;
public:
Connection(int sock, Tcpserver *ptr) : _sock(sock), _ptr(ptr)
{
}
~Connection()
{
}
void SetReadfuc(fuc_t fuc)
{
_readfuc = fuc;
}
void SetWritefuc(fuc_t fuc)
{
_writefuc = fuc;
}
void SetExceptfuc(fuc_t fuc)
{
_exceptfuc = fuc;
}
};
? ? ? ??
Tcpserver.hpp:
class Tcpserver
{
public:
Tcpserver(int port)
{
// 網(wǎng)絡(luò)
_listensock = Sock::Socket();
Sock::Bind(_listensock, port);
Sock::Listen(_listensock);
// epoll
_epfd = Epoller::CreateEpoll();
// add事件
Epoller::Addevent(_epfd, _listensock, EPOLLIN | EPOLLET);
// 將listensock匹配的connection方法添加到unordered_map中
auto iter = new Connection(_listensock, this);
iter->SetReadfuc(std::bind(&Tcpserver::Accepter, this, std::placeholders::_1));
_conn.insert({_listensock, iter});
// 初始化就緒隊(duì)列
_revs = new struct epoll_event[_revs_num];
}
int Accepter(Connection *conn)
{
string clientip;
uint16_t clientport;
int sockfd = Sock::Accept(conn->_sock, &clientip, &clientport);
if (sockfd < 0)
{
logMessage(FATAL, "accept error");
return -1;
}
logMessage(DEBUG, "Get a new connect : %d", sockfd);
AddConn(sockfd, EPOLLIN | EPOLLET);
return 0;
}
bool SockinConn(int sock)
{
auto iter = _conn.find(sock);
if (iter == _conn.end())
{
return false;
}
else
{
return true;
}
}
void AddConn(int sock, uint32_t event)
{
// 將文件描述符加入epoll模型中
Epoller::Addevent(_epfd, sock, event);
// 將文件描述符匹配的connection,也加入map中
_conn.insert({sock, new Connection(sock, this)});
logMessage(DEBUG, "將文件描述符匹配的connection加入map成功");
}
void Dispatcher()
{
// 獲取就緒事件
int n = Epoller::GetReadyFd(_epfd, _revs, _revs_num);
// logMessage(DEBUG, "GetReadyFd,epoll_wait");
// 事件派發(fā)
for (int i = 0; i < n; i++)
{
int sock = _revs[i].data.fd;
uint32_t revent = _revs[i].events;
if (EPOLLIN & revent)
{
// 先判空
if (SockinConn(sock) && _conn[sock]->_readfuc)
{
// 該文件描述符對(duì)應(yīng)的讀方法
_conn[sock]->_readfuc(_conn[sock]);
}
}
if (EPOLLOUT & revent)
{
// 先判空
if (SockinConn(sock) && _conn[sock]->_writefuc)
{
// 該文件描述符對(duì)應(yīng)的寫方法
_conn[sock]->_writefuc(_conn[sock]);
}
}
}
}
void Run()
{
while (1)
{
Dispatcher();
}
}
~Tcpserver()
{
if (_listensock != -1)
close(_listensock);
if (_epfd != -1)
close(_epfd);
delete[] _revs;
}
private:
// 1.網(wǎng)絡(luò)sock
int _listensock;
// 2.epoll
int _epfd;
// 3.將epoll與上層代碼結(jié)合
unordered_map<int, Connection *> _conn;
// 4.就緒事件列表
struct epoll_event *_revs;
// 5.就緒事件列表大小
static const int _revs_num = 64;
};
Epoller.hpp:
#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstring>
#include <sys/epoll.h>
#include "Log.hpp"
using namespace std;
class Epoller
{
public:
static int CreateEpoll()
{
int size = 128;
int epfd = epoll_create(size);
if (epfd < 0)
{
logMessage(FATAL, "%d:%s", errno, strerror(errno));
exit(1);
}
return epfd;
}
static bool Addevent(int epfd, int sock, uint32_t event)
{
struct epoll_event ev;
ev.data.fd = sock;
ev.events = event;
int n = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);
if (n != 0)
{
logMessage(FATAL, "%d:%s", errno, strerror(errno));
return false;
}
return true;
}
static int GetReadyFd(int epfd, struct epoll_event evs[], int num)
{
// 阻塞式
int n = epoll_wait(epfd, evs, num, -1);
if (n == -1)
{
logMessage(FATAL, "%d:%s", errno, strerror(errno));
}
return n;
}
};
???????
Epoll.cc:
#include "Tcpserver.hpp"
#include "Sock.hpp"
#include <memory>
using namespace std;
void Usage(string process)
{
cout << "Please entry" << process << " port" << endl;
}
int main(int argv, char **argc)
{
if (argv != 2)
{
Usage(argc[0]);
exit(1);
}
unique_ptr<Tcpserver> ep(new Tcpserver(atoi(argc[1])));
ep->Run();
return 0;
}
現(xiàn)象:?
? ? ? ? 當(dāng)然還沒有寫完。
? ? ? ? 結(jié)合整體代碼看:①通過(guò)Tcpserver的構(gòu)造函數(shù),先創(chuàng)建網(wǎng)絡(luò)套接字,建立epoll模型②監(jiān)聽listen套接字,并為listen設(shè)置對(duì)應(yīng)的connect類設(shè)置讀方法③將listen套接字添加到epoll模型中④通過(guò)epoll_wait等待事件就緒⑤當(dāng)listen套接字就緒時(shí),調(diào)用回調(diào)函數(shù)其中accept來(lái)獲取新連接的到來(lái)。
? ? ? ? 同時(shí),因?yàn)槲覀兘裉鞂懙氖荅T模式,所以要將文件描述符設(shè)置為非阻塞式,即使用fcntl。
? ? ? ? 目前代碼只寫到了這里,想縱觀全貌更加的詳細(xì)請(qǐng)看我的gitee。
? ? ? ? 下面繼續(xù)增加內(nèi)容,新的連接到來(lái),要為新的連接增加對(duì)應(yīng)的方法。
? ? ? ? 并且要進(jìn)行序列化與反序列化,因?yàn)槲覀兊姆?wù)器是基于TCP的流式讀取,每次讀取我們確保不了讀上來(lái)的數(shù)據(jù)是完整的數(shù)據(jù),所以要做處理。
? ? ? ? 當(dāng)前的處理為我們讀上的數(shù)據(jù)如同:112233X1213Xadasd?!甔’作為分隔符,我們需要讀上來(lái)的數(shù)據(jù),將數(shù)據(jù)進(jìn)行分割。
? ? ? ? 同樣建立一個(gè)回調(diào)方法,在read之后對(duì)數(shù)據(jù)進(jìn)行分割通過(guò)分割,在此之前對(duì)它進(jìn)行初始化。
using callbcak_t = function<int(Connection *, string &)>;
int HandlerPro(Connection *conn, string &message)
{
// 我們能保證走到這里一定是完整的報(bào)文,已經(jīng)解碼
// 接下來(lái)是反序列化
cout << "獲取request : " << message <<"剩余的信息是"<<conn->_inbuff<<endl;
}
int main(int argv, char **argc)
{
if (argv != 2)
{
Usage(argc[0]);
exit(1);
}
unique_ptr<Tcpserver> ep(new Tcpserver(HandlerPro, atoi(argc[1])));
ep->Run();
return 0;
}
int TcpRecver(Connection *conn)
{
// 對(duì)普通套接字讀取
while (true)
{
char buff[1024];
ssize_t sz = recv(conn->_sock, buff, sizeof(buff) - 1, 0);
if (sz > 0)
{
buff[sz] = '\0';
conn->_inbuff += buff;
}
else if (sz == 0)
{
logMessage(DEBUG, "client quit");
}
else if (sz < 0)
{
if (errno == EINTR)
{
// 因?yàn)樾盘?hào)導(dǎo)致IO關(guān)閉,但數(shù)據(jù)還沒有讀完
continue;
}
else if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// 讀完了
break;
}
else
{
// 讀取出錯(cuò)
}
}
}
// 本輪讀取完畢
// 將讀取上來(lái)的 如:xxxxx/3xxxxx/3xxx/3
// 分為 xxxxx 、xxxxx、xxx
vector<string> result;
PackageSplit(conn->_inbuff, &result);
for (auto &message : result)
{
_cb(conn, message);
}
return 0;
}
現(xiàn)象:
②處理BUG
? ? ? ? 我們要解決Accept函數(shù)的BUG,因?yàn)槲覀儺?dāng)前是ET模式,如果當(dāng)前有大量的連接來(lái),系統(tǒng)只會(huì)通知上層一次,而只進(jìn)行一次調(diào)用顯然是不對(duì)的,會(huì)導(dǎo)致讀不上其他到來(lái)的鏈接。所以此時(shí)我們要進(jìn)行循環(huán)讀取,那循環(huán)讀取時(shí)什么時(shí)候停止呢?要根據(jù)Accept函數(shù)中的accept函數(shù)調(diào)用失敗時(shí)設(shè)置的errno來(lái)判別。我們來(lái)看下究竟errno會(huì)被設(shè)置成什么。
? ? ? ? EAGAIN和EWOULDBLOCK,意思為,當(dāng)前的文件描述符被置為非阻塞的且當(dāng)前沒有可接受的連接。意味著已經(jīng)將當(dāng)前到來(lái)的連接讀完了。
? ? ? ? EINTR表示當(dāng)前的系統(tǒng)調(diào)用被一個(gè)捕捉到的信號(hào)中斷在一個(gè)有效的連接到來(lái)之前。
? ? ? ? 這三個(gè)宏值得我們關(guān)注,剩下的宏都是表明accept出錯(cuò)了,所以我們可以這樣修改函數(shù)。
int Accepter(Connection *conn)
{
while (1)
{
string clientip;
uint16_t clientport;
int sockfd = Sock::Accept(conn->_sock, &clientip, &clientport);
if (sockfd < 0)
{
if (errno == EINTR) // 被信號(hào)中斷
continue;
else if (errno == EAGAIN || errno == EWOULDBLOCK) // 讀取結(jié)束
break;
else
{
// 出錯(cuò)
logMessage(FATAL, "accept error");
return -1;
}
}
logMessage(DEBUG, "Get a new connect : %d", sockfd);
AddConn(sockfd, EPOLLIN | EPOLLET);
}
return 0;
}
????????文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-432425.html
③優(yōu)化結(jié)構(gòu)? ? ? ?
?????????
? ? ? ? 我們會(huì)發(fā)現(xiàn)被標(biāo)注的代碼,與下文的AddConn函數(shù)功能具有相似性,為了避免耦合性更高,我們將這段代碼移到AddConn函數(shù)中。
void AddConn(int sock, uint32_t event, fuc_t readfuc, fuc_t writefuc, fuc_t exceptfuc)
{
if (event & EPOLLET)
Util::SetNonBlock(sock);
// 將文件描述符加入epoll模型中
Epoller::Addevent(_epfd, sock, event);
// 將文件描述符匹配的connection,也加入map中
Connection *conn = new Connection(sock, this);
conn->SetReadfuc(readfuc);
conn->SetWritefuc(writefuc);
conn->SetExceptfuc(exceptfuc);
// conn->SetReadfuc(std::bind(&Tcpserver::TcpRecver, this, std::placeholders::_1));
// conn->SetWritefuc(std::bind(&Tcpserver::TcpSender, this, std::placeholders::_1));
// conn->SetExceptfuc(std::bind(&Tcpserver::TcpExcepter, this, std::placeholders::_1));
_conn.insert({sock, conn});
logMessage(DEBUG, "將文件描述符匹配的connection加入map成功");
}
?????????改了AddConn函數(shù),還需對(duì)其他涉及到此函數(shù)的地方大動(dòng)干戈。
Tcpserver(callbcak_t cb, int port) : _cb(cb)
{
// 網(wǎng)絡(luò)
_listensock = Sock::Socket();
Util::SetNonBlock(_listensock);
Sock::Bind(_listensock, port);
Sock::Listen(_listensock);
// epoll
_epfd = Epoller::CreateEpoll();
// 添加listen事件
AddConn(_listensock, EPOLLIN | EPOLLET,
std::bind(&Tcpserver::Accepter, this, std::placeholders::_1), nullptr, nullptr);
// // add事件
// Epoller::Addevent(_epfd, _listensock, EPOLLIN | EPOLLET);
// // 將listensock匹配的connection方法添加到unordered_map中
// auto iter = new Connection(_listensock, this);
// iter->SetReadfuc(std::bind(&Tcpserver::Accepter, this, std::placeholders::_1));
// _conn.insert({_listensock, iter});
// 初始化就緒隊(duì)列
_revs = new struct epoll_event[_revs_num];
}
int Accepter(Connection *conn)
{
while (1)
{
string clientip;
uint16_t clientport;
int sockfd = Sock::Accept(conn->_sock, &clientip, &clientport);
if (sockfd < 0)
{
if (errno == EINTR) // 被信號(hào)中斷
continue;
else if (errno == EAGAIN || errno == EWOULDBLOCK) // 讀取結(jié)束
break;
else
{
// 出錯(cuò)
logMessage(FATAL, "accept error");
return -1;
}
}
logMessage(DEBUG, "Get a new connect : %d", sockfd);
AddConn(sockfd, EPOLLIN | EPOLLET,
std::bind(&Tcpserver::TcpRecver, this, std::placeholders::_1),
std::bind(&Tcpserver::TcpSender, this, std::placeholders::_1),
std::bind(&Tcpserver::TcpExcepter, this, std::placeholders::_1));
}
return 0;
}
④異常處理
? ? ? ? 就緒的文件描述符掛斷了,就緒的文件描述符出錯(cuò)了怎么辦。我們統(tǒng)一起來(lái)轉(zhuǎn)為文件描述符
讀事件或?qū)懯录途w,必定在讀時(shí)或?qū)憰r(shí)出錯(cuò),我們轉(zhuǎn)而去那時(shí)處理異常。
?
? ? ? ? ?接下來(lái),我們呢要對(duì)異常處理函數(shù)編寫。
代碼:
int TcpExcepter(Connection *conn)
{
// 處理普通套接字異常
// 0.檢測(cè)
if (!SockinConn(conn->_sock))
return -1;
// 1.移除事件
Epoller::DelEvent(_epfd, conn->_sock);
logMessage(DEBUG, "remove epoll event");
// 2.關(guān)閉文件描述符
close(conn->_sock);
logMessage(DEBUG, "close fd :%d", conn->_sock);
// 3.刪除map中的sock對(duì)應(yīng)的conn
// delete conn;
delete _conn[conn->_sock];
logMessage(DEBUG, "delete conn object success");
// 4.去掉sock和conn的映射關(guān)系 上一步只是delete掉了對(duì)象,但是映射關(guān)系還在
_conn.erase(conn->_sock);
logMessage(DEBUG, "erase conn from map success");
}
現(xiàn)象:
④序列化與反序列化
? ? ? ? 何為序列化?何為反序列化?
? ? ? ? 我們?cè)诰W(wǎng)絡(luò)傳輸時(shí)以字符串形式發(fā)送,傳輸時(shí)以二進(jìn)制形式。那我想要給對(duì)方發(fā)送結(jié)構(gòu)體怎么辦,則需要將結(jié)構(gòu)體轉(zhuǎn)換為字符串形式,這個(gè)過(guò)程稱為序列化;對(duì)方收到字符串,通過(guò)反序列化就可以得到結(jié)構(gòu)體。
? ? ? ? 我們今天的結(jié)構(gòu)體如圖所示:
struct Request
{
int _x;
int _y;
char _op;
};
struct Response
{
int _result;
int _exitcode;
};
? ? ? ? ?客戶端發(fā)送一串字符串我們接受到之后,通過(guò)反序列化轉(zhuǎn)換為結(jié)構(gòu)體。
bool Parser(string &in, Request *out)
{
// 反序列化
// 1 + 1, 2 * 4, 5 * 9, 6 *1
std::size_t spaceOne = in.find(SPACE);
if (std::string::npos == spaceOne)
return false;
std::size_t spaceTwo = in.rfind(SPACE);
if (std::string::npos == spaceTwo)
return false;
std::string dataOne = in.substr(0, spaceOne);
std::string dataTwo = in.substr(spaceTwo + SPACE_LEN);
std::string oper = in.substr(spaceOne + SPACE_LEN, spaceTwo - (spaceOne + SPACE_LEN));
if (oper.size() != 1)
return false;
// 轉(zhuǎn)成內(nèi)部成員
out->_x = atoi(dataOne.c_str());
out->_y = atoi(dataTwo.c_str());
out->_op = oper[0];
return true;
}
void Serialize(Response &in, string *out)
{
// 序列化
// "exitCode_ result_"
std::string ec = std::to_string(in._exitcode);
std::string res = std::to_string(in._result);
*out = ec;
*out += SPACE;
*out += res;
*out += CRLF;
}
? ? ? ? 接下來(lái),去文件描述符對(duì)應(yīng)的讀方法,調(diào)用該函數(shù)。
Response Calculate(Request &req)
{
Response resp = {0,0};
switch (req._op)
{
case '+':
resp._result = req._x + req._y;
break;
case '-':
resp._result = req._x - req._y;
break;
case '*':
resp._result = req._x * req._y;
break;
case '/':
{
if (req._y == 0)
{
resp._exitcode = 1; // 1 除零錯(cuò)誤
resp._result = INT32_MAX;
}
else
resp._result = req._x / req._y;
break;
}
case '%':
{
if (req._y == 0)
{
resp._exitcode = 2; // 2 模零錯(cuò)誤
resp._result = INT32_MAX;
}
else
resp._result = req._x % req._y;
break;
}
default:
resp._exitcode = 3; // 非法輸入
break;
}
return resp;
}
int HandlerPro(Connection *conn, string &message)
{
// 我們能保證走到這里一定是完整的報(bào)文,已經(jīng)解碼
cout << "獲取request : " << message << endl;
// 1 * 1
// 接下來(lái)是反序列化
Request req;
if (Parser(message, &req) == false)
{
return -1;
}
// 業(yè)務(wù)處理
Response resp = Calculate(req);
// 序列化
string out;
Serialize(resp, &out);
// 發(fā)送給client
conn->_outbuff += out;
// 發(fā)送
}
? ? ? ? 能不能直接調(diào)用send方法無(wú)腦的直接向客戶端發(fā)送呢?
? ? ? ? 首先要知道寫的緩沖區(qū)是否已滿,如何檢測(cè)緩沖區(qū)已滿?
? ? ? ? 在LT模式中,當(dāng)我們想寫時(shí)只需要將對(duì)應(yīng)的文件描述符所對(duì)應(yīng)的EPOLLOUT添加事件中,在我們今天的編寫代碼中,在whlie循環(huán)中的事件檢測(cè)中,當(dāng)檢測(cè)到是EPOLLOUT事件時(shí),我們就可以在回調(diào)函數(shù)中調(diào)用send函數(shù)。
? ? ? ? 在ET模式中,我們也可以使用上面的方法,但是ET模式追求高效,所以一般會(huì)直接發(fā)送數(shù)據(jù),如果數(shù)據(jù)發(fā)送完了,那就可以結(jié)束了;如果沒有發(fā)送完,緩沖區(qū)已滿,就會(huì)選擇去拜托EPOLL去完成后續(xù)任務(wù)。
int HandlerPro(Connection *conn, string &message)
{
// 我們能保證走到這里一定是完整的報(bào)文,已經(jīng)解碼
cout << "---------------" << endl;
// 1 * 1
// 接下來(lái)是反序列化
cout << "獲取request : " << message << endl;
Request req;
if (Parser(message, &req) == false)
{
return -1;
}
// 業(yè)務(wù)處理
Response resp = Calculate(req);
// 序列化
string out;
Serialize(resp, &out);
// 發(fā)送給client
conn->_outbuff += out;
conn->_writefuc(conn);
if (conn->_outbuff.empty())
{
if (conn->_outbuff.empty() == 0)
conn->_ptr->ModSockEvent(conn->_sock, true, false);
else
conn->_ptr->ModSockEvent(conn->_sock, true, true);
}
// // 發(fā)送
// // conn->_ptr->ModSockEvent(conn->_sock, true, true);
cout << "---------------" << endl;
}
void ModSockEvent(int sock, bool read, bool write)
{
uint32_t event = 0;
event |= read ? EPOLLIN : 0;
event |= write ? EPOLLOUT : 0;
Epoller::ModEvent(_epfd, sock, event);
}
static bool ModEvent(int epfd, int sock, uint32_t event)
{
struct epoll_event ev;
ev.data.fd = sock;
ev.events = event;
int n = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ev);
return n == 0;
}
? ? ? ? 因?yàn)?,我們截取字符串的函?shù)沒有找到我們規(guī)定的分隔符的話,就會(huì)將最后的一部字符串歸到下一次讀取時(shí)。原來(lái)的代碼如下所示,所以就會(huì)導(dǎo)致每次輸入一段字符串,只會(huì)輸出最后一段的結(jié)果,我們只需要稍作修改。
void PackageSplit(string &buff, vector<string> *result)
{
// asdasXdasdaXda
// asdas dasda da
while (true)
{
size_t pos = buff.find(SEP);
if (pos == string::npos)
{
break;
}
result->push_back(buff.substr(0, pos));
buff.erase(0, pos + SEP_SZ);
}
}
修改后:
void PackageSplit(string &buff, vector<string> *result)
{
// asdasXdasdaXda
// asdas dasda da
while (true)
{
size_t pos = buff.find(SEP);
if (pos == string::npos)
{
if (buff.size() < 5)
{
buff.clear();
break;
}
result->push_back(buff.substr(0, buff.size()));
buff.clear();
break;
}
result->push_back(buff.substr(0, pos));
buff.erase(0, pos + SEP_SZ);
}
}
現(xiàn)象:
⑤優(yōu)化
? ? ? ? 我們的代碼耦合度太高了,數(shù)據(jù)處理的函數(shù)放在了源文件中,我們另起一個(gè)頭文件,將處理函數(shù)放到該頭文件中,這樣業(yè)務(wù)處理是業(yè)務(wù)處理,網(wǎng)絡(luò)服務(wù)是網(wǎng)絡(luò)服務(wù)。
Service.hpp:
#pragma once
#include "Protocol.hpp"
#include <functional>
using service_t = function<Response (Request &req)>;
Response Calculate(Request &req)
{
Response resp = {0, 0};
switch (req._op)
{
case '+':
resp._result = req._x + req._y;
break;
case '-':
resp._result = req._x - req._y;
break;
case '*':
resp._result = req._x * req._y;
break;
case '/':
{
if (req._y == 0)
{
resp._exitcode = 1; // 1 除零錯(cuò)誤
resp._result = INT32_MAX;
}
else
resp._result = req._x / req._y;
break;
}
case '%':
{
if (req._y == 0)
{
resp._exitcode = 2; // 2 模零錯(cuò)誤
resp._result = INT32_MAX;
}
else
resp._result = req._x % req._y;
break;
}
default:
resp._exitcode = 3; // 非法輸入
break;
}
return resp;
}
????
epoll.cc:
int HandlerProHelp(Connection *conn, string &message,service_t service)
{
// 我們能保證走到這里一定是完整的報(bào)文,已經(jīng)解碼
cout << "---------------" << endl;
// 1 * 1
// 接下來(lái)是反序列化
cout << "獲取request : " << message << endl;
Request req;
if (Parser(message, &req) == false)
{
return -1;
}
// 業(yè)務(wù)處理
Response resp = service(req);
// 序列化
string out;
Serialize(resp, &out);
// 發(fā)送給client
conn->_outbuff += out;
conn->_writefuc(conn);
if (conn->_outbuff.empty())
{
if (conn->_outbuff.empty() == 0)
conn->_ptr->ModSockEvent(conn->_sock, true, false);
else
conn->_ptr->ModSockEvent(conn->_sock, true, true);
}
// // 發(fā)送
// // conn->_ptr->ModSockEvent(conn->_sock, true, true);
cout << "---------------" << endl;
return 0;
}
int HandlerPro(Connection *conn, string &message)
{
return HandlerProHelp(conn,message,Calculate);
}
? ? ? ? 這樣在想處理其他業(yè)務(wù)的時(shí)候,只需要將業(yè)務(wù)處理函數(shù)放入Service.hpp中,然后將源文件中調(diào)用就行。
⑥Reactor模式與Proactor模式
? ? ? ? 我們今天使用的是Reactor模式。
Reactor模式:
? ? ? ? Linux系統(tǒng)中最常用的反應(yīng)器模式。
? ? ? ? 半同步半異步。
? ? ? ? 即負(fù)責(zé)事件的派發(fā),有否則IO,或者說(shuō)業(yè)務(wù)處理。
Proactor模式:
? ? ? ? 只負(fù)責(zé)事件的派發(fā),就緒的事件推送給后臺(tái)的進(jìn)程、線程池,不關(guān)心處理的細(xì)節(jié)。
? ? ? ? 到這里,epoll服務(wù)器的編寫,應(yīng)該就告一段落了,盡管還有數(shù)不清的BUG,和沒有說(shuō)清楚的知識(shí)點(diǎn),但是總體還是挺完美的,那么感謝觀看,我們下次再見。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-432425.html
????????
到了這里,關(guān)于I/O多路轉(zhuǎn)接——epoll服務(wù)器代碼編寫的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!