了解epoll底層邏輯
在我們調(diào)用epoll_create的時候會創(chuàng)建出epoll模型,這個模型也是利用文件描述類似文件系統(tǒng)的方式控制該結(jié)構(gòu)。
在我們調(diào)用epoll_create的時候,就會在內(nèi)核管理中創(chuàng)建一個epoll模型,并且建管理模塊地址給file結(jié)構(gòu)體,file結(jié)構(gòu)體也是連接在管理所有file結(jié)構(gòu)體的數(shù)據(jù)結(jié)構(gòu)中
所以epoll也會給進(jìn)程管理的files返回一個file地址保存在file_array中,并且將該地址在array中的下標(biāo)值返回給上層。
這樣以同一的方式管理epoll模型。所以這就是epoll模型的好處,這和select和poll的方式不同,這兩種并不使用文件描述符
select還需要自己維護(hù)一個關(guān)心事件的fd的數(shù)組,然后再select結(jié)束以后,遍歷該數(shù)組中的fd和輸入輸出型參數(shù)fd_set做查詢關(guān)系(FD_ISSET),這其實(shí)是非常不方便的,在發(fā)生事件我們都需要遍歷全部關(guān)心的事件,查看事件是否發(fā)生。并且因?yàn)槭禽斎胼敵鲂停╢d_set)參數(shù),在響應(yīng)后,之前設(shè)置的監(jiān)視事件失效,所以每次監(jiān)視事件前,都需要重新輸入所有需要監(jiān)聽的事件這是非常不方便的事情
poll在select上做了升級,不再需要額外的數(shù)組保存而是使用pollfd結(jié)構(gòu)體保存fd和關(guān)心事件,但是在響應(yīng)后我們依舊需要遍歷所有關(guān)心的事件,假設(shè)1w個被監(jiān)控的事件只有1個得到了響應(yīng),我們卻需要遍歷1w個事件一個一個檢查是否響應(yīng),這是低效的算法。
并且在操作系統(tǒng)中poll和epoll搭建的服務(wù)器關(guān)心的事件會被一直遍歷查詢是否被響應(yīng),哪怕1w個關(guān)心事件只有一個響應(yīng)是第一個,剩下的9999個事件我們也得查看其是否被響應(yīng)。
我們不應(yīng)該在響應(yīng)得到后遍歷所有的事件,操作系統(tǒng)也應(yīng)該輪詢的檢查所有監(jiān)控事件被響應(yīng),這是低效的2個做法,這就是epoll出現(xiàn)的意義,他的出現(xiàn)解決了這些繁雜的問題,并且在接口使用上做了極大的優(yōu)化。他利用紅黑樹來管理需要監(jiān)視程序員需要關(guān)心的事件和利用準(zhǔn)備隊(duì)列構(gòu)建另一個結(jié)構(gòu),該結(jié)構(gòu)保存了本次等待得到的所有有響應(yīng)的事件。
epoll模型介紹
?
創(chuàng)建epoll模型:調(diào)用epoll_create,在文件描述符表添加一個描述符,生成對應(yīng)的文件結(jié)構(gòu)體結(jié)構(gòu)體保存對應(yīng)生成eventpoll結(jié)構(gòu)體的地址,該結(jié)構(gòu)中有rbr(監(jiān)視事件紅黑樹),rdllist(就緒事件隊(duì)列)等等。? ? ? ??
添加一個fd到epoll中:調(diào)用epoll_ctl,通過epollfd在進(jìn)程文件描述符表中找到對應(yīng)的file,然后在對應(yīng)的文件結(jié)構(gòu)體中的標(biāo)識符將特定指針強(qiáng)轉(zhuǎn)為eventpoll,訪問rbr,增加新結(jié)點(diǎn)在樹中,并且添加對應(yīng)的回調(diào)函數(shù)到對應(yīng)fd的文件結(jié)構(gòu)體中。
接收并讀取報(bào)文:網(wǎng)卡設(shè)備得到數(shù)據(jù),發(fā)送設(shè)備中斷給cpu,cpu根據(jù)接收到的中斷號,在中斷向量表中查找設(shè)備驅(qū)動提供的接口回調(diào),將數(shù)據(jù)從網(wǎng)卡中讀取到OS層的file文件結(jié)構(gòu)體中,然后經(jīng)過部分協(xié)議解析到TCP解析后,根據(jù)端口找到對應(yīng)的進(jìn)程,在進(jìn)程中依靠五元組和fd的映射關(guān)系找到對應(yīng)的file結(jié)構(gòu)體,然后將網(wǎng)卡file的數(shù)據(jù)拷貝到對應(yīng)服務(wù)器鏈接的file下的緩沖區(qū)中,并且調(diào)用其傳入的callback函數(shù)傳入fd通知epoll模型,有數(shù)據(jù)來臨。這個時候我們的epoll在自己的rb樹中依靠fd找到對應(yīng)結(jié)點(diǎn),并且其是否是自己所關(guān)心的事件,找到并且是我們的事件,就會取出其rb中的fd和響應(yīng)的事件做拼接(一個結(jié)點(diǎn)監(jiān)視一個fd的多個事件,發(fā)生響應(yīng)并不是發(fā)生全部響應(yīng),一般都是一個響應(yīng),這個時候就需要將響應(yīng)的事件和fd做結(jié)合,而不是全部事件和fd做結(jié)合)構(gòu)建ready結(jié)點(diǎn)反應(yīng)給上層。
誠然在我們放入事件和拿出響應(yīng)事件的過程中并不是原子的查找,比如訪問ready結(jié)點(diǎn)操作系統(tǒng)可能在構(gòu)建,而我們在拿出,這里就會造成執(zhí)行流混亂的局面,所以這里是需要進(jìn)程鎖的,保證執(zhí)行流正常。
慶幸的是,我們的設(shè)計(jì)者大佬們已將幫我們鎖好了,我們用就好了。
LT和ET的區(qū)別
LT的工作模式:
- 當(dāng)epoll檢測到socket上事件就緒的時候, 可以不立刻進(jìn)行處理. 或者只處理一部分.
- 由于只讀了1K數(shù)據(jù), 緩沖區(qū)中還剩1K數(shù)據(jù), 在第二次調(diào)用 epoll_wait 時, epoll_wait 仍然會立刻返回并通知socket讀事件就緒.
- 直到緩沖區(qū)上所有的數(shù)據(jù)都被處理完, epoll_wait 才不會立刻返回.
- 支持阻塞讀寫和非阻塞讀寫
ET的工作模式:
- 當(dāng)epoll檢測到socket上事件就緒時, 必須立刻處理.
- 雖然只讀了1K的數(shù)據(jù), 緩沖區(qū)還剩1K的數(shù)據(jù), 在第二次調(diào)用 epoll_wait 的時候, epoll_wait 不會再返回了. 也就是說, ET模式下, 文件描述符上的事件就緒后, 只有一次處理機(jī)會,所以需要一次性讀取完畢.
- ET的性能比LT性能更高( epoll_wait 返回的次數(shù)少了很多). Nginx默認(rèn)采用ET模式使用epoll.
- 只支持非阻塞的讀寫
二者對比
- LT是 epoll 的默認(rèn)行為. 使用 ET 能夠減少 epoll 觸發(fā)的次數(shù). 但是代價(jià)就是強(qiáng)逼著程序猿一次響應(yīng)就緒過程中就把 所有的數(shù)據(jù)都處理完.
- 相當(dāng)于一個文件描述符就緒之后, 不會反復(fù)被提示就緒, 看起來就比 LT 更高效一些. 但是在 LT 情況下如果也能做到 每次就緒的文件描述符都立刻處理, 不讓這個就緒被重復(fù)提示的話, 其實(shí)性能也是一樣的.
- 另一方面, ET 的代碼復(fù)雜程度更高了.
ps:使用 ET 模式的 epoll, 需要將文件描述設(shè)置為非阻塞. 這個不是接口上的要求, 而是 "工程實(shí)踐" 上的要求,畢竟我們需要一次性讀取全部數(shù)據(jù),在最后一次不能讀取的時候會阻塞在接口處。
插件組合
創(chuàng)建多個類:Epoll類、Sock類、Connection類、Log類
Epoll類
用來為我們保存并管理epoll模型。
static const unsigned int epoll_event_size_default = 64;
class Epoll
{
public:
Epoll(unsigned int epoll_event_size = epoll_event_size_default)
: _epoll_event_size(epoll_event_size)
{
_epoll_fd = epoll_create(254);
if (_epoll_fd == -1)
{
Log()(Fatal, "epoll_create fail:");
exit(-1);
}
_epoll_event = new epoll_event[_epoll_event_size];
}
struct epoll_event *bind_ready_ptr()
{
return _epoll_event;
}
int EpollCtl(int op, int fd, int event)
{
struct epoll_event ev;
ev.data.fd = fd;
ev.events = event;
int status = epoll_ctl(_epoll_fd, op, fd, &ev);
return status == 0;
}
int EpollWait(int timeout)
{
int n = epoll_wait(_epoll_fd, _epoll_event, _epoll_event_size, timeout);
return n;
}
int fds_numb()
{
return _epoll_event_size;
}
private:
int _epoll_fd;
struct epoll_event *_epoll_event;
unsigned int _epoll_event_size;
};
該類管理著,epoll模型文件描述符,_epoll_event第一個就緒結(jié)點(diǎn)地址、最大可以接收的 _epoll_event_size.
注意這里的_epoll_event,并不是實(shí)際在epoll模型中的自由結(jié)點(diǎn),而是該自由結(jié)點(diǎn)將重要信息拷貝到我們傳入的這個空間中。
傳入的event_size是告訴epoll模型我最多只能拷貝這么多個結(jié)點(diǎn)信息,還有就下次再說了,返回值是本次拷貝數(shù)量n。
Sock類
替我們來鏈接新鏈接的類
class Sock
{
public:
Sock(int gblock = 5)
: _listen_socket(socket(AF_INET, SOCK_STREAM, 0)), _gblock(gblock)
{
int opt = 1;
setsockopt(_listen_socket, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof opt);
}
int get_listen_sock()
{
return _listen_socket;
}
void Sock_bind(const std::string &ip = "0.0.0.0", uint16_t port = 8080)
{
sockaddr_in self;
bzero(&self, sizeof(self));
self.sin_family = AF_INET;
self.sin_addr.s_addr = inet_addr(ip.c_str());
self.sin_port = htons(port);
if (0 > bind(_listen_socket, (sockaddr *)&self, sizeof(self)))
{
log(Fatal, "bind 致命錯誤[%d]", __TIME__);
exit(1);
}
}
void Sock_connect(const char *ip, const char *port)
{
struct sigaction s;
sockaddr_in server;
bzero(&server, sizeof(server));
server.sin_family = AF_INET;
inet_aton(ip, &server.sin_addr);
server.sin_port = htons(atoi(port));
connect(_listen_socket, (sockaddr *)&server, sizeof(server));
}
void Sock_listen()
{
if (listen(_listen_socket, _gblock) > 0)
{
log(Fatal, "listen 致命錯誤[%d]", __TIME__);
exit(2);
}
}
int Sock_accept(std::string *ip, uint16_t *port)
{
sockaddr_in src;
bzero(&src, sizeof(src));
socklen_t srclen = sizeof(src);
int worksocket = accept(_listen_socket, (sockaddr *)&src, &srclen);
if (worksocket < 0)
{
log(Fatal, "link erron 鏈接失敗");
return -1;
}
*ip = inet_ntoa(src.sin_addr);
*port = ntohs(src.sin_port);
return worksocket;
}
~Sock()
{
if (_listen_socket >= 0)
close(_listen_socket);
}
private:
int _listen_socket;
const int _gblock;
};
圍繞著_listen_socket來操作的類
Log類
就是個日志沒啥文章來源:http://www.zghlxwxcb.cn/news/detail-829307.html
class Log
{
public:
Log()
{
std::cout<<"create log...\n"<<std::endl;
printMethod = Screen;
path = "./log/";
}
void Enable(int method)
{
printMethod = method;
}
std::string levelToString(int level)
{
switch (level)
{
case Info:
return "Info";
case Debug:
return "Debug";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "None";
}
}
void printLog(int level, const std::string &logtxt)
{
switch (printMethod)
{
case Screen:
std::cout << logtxt << std::endl;
break;
case Onefile:
printOneFile(LogFile, logtxt);
break;
case Classfile:
printClassFile(level, logtxt);
break;
default:
break;
}
}
void printOneFile(const std::string &logname, const std::string &logtxt)
{
std::string _logname = path + logname;
std::cout<<_logname<<std::endl;
int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); // "log.txt"
if (fd < 0)
{
perror("fail:");
return;
}
write(fd, logtxt.c_str(), logtxt.size());
close(fd);
}
void printClassFile(int level, const std::string &logtxt)
{
std::string filename = LogFile;
filename += ".";
filename += levelToString(level); // "log.txt.Debug/Warning/Fatal"
printOneFile(filename, logtxt);
}
~Log()
{
}
void operator()(int level, const char *format, ...)
{
time_t t = time(nullptr);
struct tm *ctime = localtime(&t);
char leftbuffer[SIZE];
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
va_list s;
va_start(s, format);
char rightbuffer[SIZE];
vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
va_end(s);
// 格式:默認(rèn)部分+自定義部分
char logtxt[SIZE * 2];
snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
// printf("%s", logtxt); // 暫時打印
printLog(level, logtxt);
}
private:
int printMethod;
std::string path;
};
Connection類
using func_t = std::function<void(Connection *)>;
class Connection
{
public:
Connection(int sock, void *tsvr = nullptr) : _fd(sock), _tsvr(tsvr)
{
time_t _lasttime = (time_t)time(0);
}
bool SetCallBack(func_t recv_cb, func_t send_cb, func_t except_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
int _fd;
int _events;
// 三個回調(diào)方法,表征的就是對_sock進(jìn)行特定讀寫對應(yīng)的方法
func_t _recv_cb;
func_t _send_cb;
func_t _except_cb;
// 接收緩沖區(qū)&&發(fā)送緩沖區(qū)
std::string _inbuffer; // 暫時沒有辦法處理二進(jìn)制流,文本是可以的
std::string _outbuffer;
int _lasttime = 0;
std::string _client_ip;
uint16_t _client_port;
// 設(shè)置對epTcpServer的回值指針
void *_tsvr;
};
管理任何鏈接描述符(包括listen)的鏈接類,保存某個鏈接監(jiān)視的讀寫異常事件,并且保存這些事件發(fā)生后對應(yīng)的調(diào)用方法,并且每個事件設(shè)置讀寫應(yīng)用層緩沖區(qū),并且采用回值指針(在寫入數(shù)據(jù)后采用該指針通知上層下次該鏈接修改采用監(jiān)視事件條件。文章來源地址http://www.zghlxwxcb.cn/news/detail-829307.html
服務(wù)器代碼
#pragma once
#include "Log.hpp"
#include "sock.hpp"
#include "Epoll.hpp"
#include "Protocol.hpp"
#include <unordered_map>
#include <cassert>
#include <vector>
static const std::uint16_t server_port_defaut = 8080;
static const std::string server_ip_defaut = "0.0.0.0";
static const int READONE = 1024;
#define CLIENTDATA conn->_client_ip.c_str(),conn->_client_port
using callback_t = std::function<void(Connection *, std::string &)>;
class epTcpServer
{
static const std::uint16_t default_port = 8080;
static const std::uint16_t default_revs_num = 128;
public:
epTcpServer(int port = default_port, int revs_num = default_revs_num)
: _port(port), _epoll(default_revs_num), _revs_num(revs_num)
{
_sock.Sock_bind();
_sock.Sock_listen();
_listen = _sock.get_listen_sock();
AddConnection(_listen, std::bind(&epTcpServer::Accept, this, std::placeholders::_1), nullptr, nullptr);
_revs = _epoll.bind_ready_ptr();
cout << "debug 1" << endl;
}
void Dispather(callback_t cb)
{
_cb = cb;
while (true)
{
LoopOnce();
}
}
void EnableReadWrite(Connection *conn, bool readable, bool writeable)
{
uint32_t events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0));
bool res = _epoll.EpollCtl(EPOLL_CTL_MOD, conn->_fd, events);
assert(res); // 更改成if
}
private:
void LoopOnce()
{
int n = _epoll.EpollWait(-1);
log(Info,"The number of links in this response :%d",n);
for (int i = 0; i < n; i++)
{
int sock = _revs[i].data.fd;
uint32_t revents = _revs[i].events;
log(Info, "Accessible fd:%d", sock);
bool status = IsConnectionExists(sock);
if (!status)
{
log(Error, "There is no such data in the hash sock:%d", sock);
continue;
}
if (revents & EPOLLIN)
{
if (_Connection_hash[sock]->_recv_cb != nullptr)
{
_Connection_hash[sock]->_recv_cb(_Connection_hash[sock]);
}
}
status = IsConnectionExists(sock);
if (revents & EPOLLOUT)
{
if (!status)
{
log(Warning, "in read closs sock:%d", sock);
continue;
}
if (_Connection_hash[sock]->_send_cb != nullptr)
_Connection_hash[sock]->_send_cb(_Connection_hash[sock]);
}
}
}
bool IsConnectionExists(int sock)
{
auto iter = _Connection_hash.find(sock);
if (iter == _Connection_hash.end())
return false;
else
return true;
}
void Accept(Connection *conn)
{
while (1)
{
std::string ip;
uint16_t port;
int work = _sock.Sock_accept(&ip, &port);
if (work < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
else if (errno == EINTR) // 信號中斷
continue; // 概率非常低
else
{
// accept失敗
log(Warning, "accept error, %d : %s", errno, strerror(errno));
break;
}
}
Connection *ret = AddConnection(work, std::bind(&epTcpServer::Read, this, std::placeholders::_1),
std::bind(&epTcpServer::Write, this, std::placeholders::_1),
std::bind(&epTcpServer::Except, this, std::placeholders::_1));
ret->_client_ip = ip;
ret->_client_port = port;
log(Info, "accept success && TcpServer success clinet[%s|%d]", ret->_client_ip.c_str(), ret->_client_port);
}
}
void Read(Connection *conn)
{
int cnt = 0;
while (1)
{
char buffer[READONE] = {0};
int n = recv(conn->_fd, buffer, sizeof(buffer) - 1, 0);
if (n < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
break; // 正常的
else if (errno == EINTR)
continue;
else
{
log(Error, "recv error, %d : %s", errno, strerror(errno));
conn->_except_cb(conn);
return;
}
}
else if (n == 0)
{
log(Debug, "client[%s|%d] quit, server close [%d]", CLIENTDATA, conn->_fd);
conn->_except_cb(conn);
return;
}
else
{
buffer[n] = 0;
conn->_inbuffer += buffer;
}
}
log(Info,"The data obtained from the client[%s|%d] is:%s",CLIENTDATA,conn->_inbuffer.c_str());
std::vector<std::string> messages;
SpliteMessage(conn->_inbuffer, &messages);
for (auto &msg : messages)
_cb(conn, msg);
}
void Write(Connection *conn)
{
printf("write back to client[%s|%d]:%s", conn->_client_ip.c_str(), conn->_client_port, conn->_outbuffer.c_str());
while (true)
{
ssize_t n = send(conn->_fd, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);
if (n > 0)
{
conn->_outbuffer.erase(0, n);
if (conn->_outbuffer.empty())
break;
}
else
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
else if (errno == EINTR)
continue;
else
{
log(Error, "send error, %d : %s", errno, strerror(errno));
conn->_except_cb(conn);
break;
}
}
}
if (conn->_outbuffer.empty())
EnableReadWrite(conn, true, false);
else
EnableReadWrite(conn, true, true);
}
void Except(Connection *conn)
{
if (!IsConnectionExists(conn->_fd))
return;
// 1. 從epoll中移除
bool res = _epoll.EpollCtl(EPOLL_CTL_DEL, conn->_fd, 0);
assert(res); // 要判斷
// 2. 從我們的unorder_map中移除
_Connection_hash.erase(conn->_fd);
// 3. close(sock);
close(conn->_fd);
// 4. delete conn;
delete conn;
log(Debug, "Excepter 回收完畢,所有的異常情況");
}
Connection *AddConnection(int sock, func_t recv_cb, func_t send_cb, func_t except_cb, int sendevent = 0)
{
SetNonBlock(sock);
Connection *conn = new Connection(sock, this);
conn->SetCallBack(recv_cb, send_cb, except_cb);
_epoll.EpollCtl(EPOLL_CTL_ADD, sock, EPOLLIN | EPOLLET | sendevent);
_Connection_hash[sock] = conn;
return conn;
}
bool SetNonBlock(int sock)
{
int fl = fcntl(sock, F_GETFL);
if (fl < 0)
return false;
fcntl(sock, F_SETFL, fl | O_NONBLOCK);
return true;
}
private:
int _listen;
int _port;
int _revs_num;
zjy::Sock _sock;
zjy::Epoll _epoll;
std::unordered_map<int, Connection *> _Connection_hash;
callback_t _cb;
struct epoll_event *_revs;
};
到了這里,關(guān)于基于epoll實(shí)現(xiàn)Reactor服務(wù)器的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!