本篇基于上篇代碼繼續(xù)改進(jìn),很長(zhǎng)。關(guān)于Reactor的說(shuō)明在后一篇
1、完善Epoll簡(jiǎn)單服務(wù)器
上面的代碼在處理讀事件時(shí),用的request數(shù)組是臨時(shí)的,如果有數(shù)據(jù)沒(méi)讀完,那么下次再來(lái)到這里,就沒(méi)有這些數(shù)據(jù)了。所以得讓每一個(gè)fd都有自己的緩沖區(qū)。建立一個(gè)Connection類(lèi),然后有一個(gè)map結(jié)構(gòu),讓這個(gè)類(lèi)和每個(gè)fd建立映射。Start函數(shù)改一下,不管超時(shí)還是出錯(cuò),就只處理數(shù)據(jù),處理的部分交給HandlerEvent,改名成LoopOnce,也就是說(shuō),Start那里還是有循環(huán),每次循環(huán)都去執(zhí)行L函數(shù),L函數(shù)用Wait提取一次,然后處理。
void Start()
{
//1、將listensock添加到epoll中,要先有epoll模型
bool r = epoller_.AddEvent(listensock_.Fd(), EPOLLIN);//只關(guān)心讀事件
assert(r);//可以做別的判斷
(void)r;
struct epoll_event revs_[gnum];
int timeout = 1000;
while(true)
{
LoopOnce(timeout);
}
}
void Accepter()
{
std::string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport);
if (sock < 0) return ;
logMessage(Debug, "%s:%d 已經(jīng)連上服務(wù)器了", clientip.c_str(), clientport);
// 還不能recv,即使有了連接但也不知道有沒(méi)有數(shù)據(jù)
// 只有epoll知道具體情況,所以將sock添加到epoll中
bool r = epoller_.AddEvent(sock, EPOLLIN);
assert(r);
(void)r;
}
void Recver(int fd)
{
char request[1024];
ssize_t s = recv(fd, request, sizeof(request) - 1, 0);
if (s > 0)
{
request[s - 1] = 0; // 對(duì)打印格式
request[s - 2] = 0; // 做一下調(diào)整
std::string response = func_(request);
send(fd, response.c_str(), response.size(), 0);
}
else
{
if (s == 0)
logMessage(Info, "client quit ...");
else
logMessage(Warning, "recv error, client quit...");
close(fd);
// 將文件描述符移除
// 在處理異常的時(shí)候,fd必須合法才能被處理
epoller_.DelEvent(fd);
}
}
void LoopOnce(int timeout)
{
int n = epoller_.Wait(revs_, gnum, timeout);
for(int i = 0; i < n; i++)
{
int fd = revs_[i].data.fd;
uint32_t events = revs_[i].events;
logMessage(Debug, "當(dāng)前正在處理%d上的%s", fd, (events&EPOLLIN) ? "EPOLLIN" : "OTHER");
if(events & EPOLLIN)//判斷讀事件就緒
{
if (fd == listensock_.Fd())
{
// 1、新連接到來(lái)
Accepter();
}
else
{
// 2、讀事件
Recver(fd);
}
}
}
}
class Connection
{
public:
Connection(int fd): fd_(fd)
{}
~Connection()
{}
public:
int fd_;
std::string inbuffer_;
std::string outbuffer_;
};
std::unordered_map<int, Connection*> connections_;
把Start的初始化任務(wù)交給InitServer
void InitServer()
{
listensock_.Socket();
listensock_.Bind(port_);
listensock_.Listen();
epoller_.Create();
logMessage(Debug, "init server success");
//為listensock創(chuàng)建對(duì)應(yīng)的connection對(duì)象
Connection* conn = new Connection(listensock_.Fd());
//將listensock和connection對(duì)象添加到connections_
connections_.insert(std::pair<int, Connection*>(listensock_.Fd(), conn));
//將listensock添加到epoll中
bool r = epoller_.AddEvent(listensock_.Fd(), EPOLLIN);
assert(r);
(void)r;
}
void Start()
{
struct epoll_event revs_[gnum];
int timeout = 1000;
while(true)
{
LoopOnce(timeout);
}
}
同樣地,Accepter有添加到epoll的fd也要映射上自己的Connection類(lèi),Recver那里就可以也改一下了
void Accepter()
{
std::string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport);
if (sock < 0) return ;
logMessage(Debug, "%s:%d 已經(jīng)連上服務(wù)器了", clientip.c_str(), clientport);
// 還不能recv,即使有了連接但也不知道有沒(méi)有數(shù)據(jù)
// 只有epoll知道具體情況,所以將sock添加到epoll中
Connection* conn = new Connection(sock);
connections_.insert(std::pair<int, Connection*>(sock, conn));
bool r = epoller_.AddEvent(sock, EPOLLIN);
assert(r);
(void)r;
}
void Recver(int fd)
{
char request[1024];
ssize_t s = recv(fd, request, sizeof(request) - 1, 0);
if (s > 0)
{
request[s - 1] = 0; // 對(duì)打印格式
request[s - 2] = 0; // 做一下調(diào)整
connections_[fd]->inbuffer_ += request;
std::string response = func_(request);
send(fd, response.c_str(), response.size(), 0);
}
else
{
if (s == 0)
logMessage(Info, "client quit ...");
else
logMessage(Warning, "recv error, client quit...");
close(fd);
// 將文件描述符移除
// 在處理異常的時(shí)候,fd必須合法才能被處理
epoller_.DelEvent(fd);
}
}
所有就緒的fd,不只包含我們關(guān)心的fd,都要有Connection類(lèi)。Accepter那里,得到連接后,獲取套接字,不直接讀取,因?yàn)椴恢朗欠裼袛?shù)據(jù),就交給epoll,不過(guò)獲取套接字后,每個(gè)套接字都需要正確讀取自己的報(bào)文,所以Connection有了兩個(gè)buffer。
所有就緒的fd,不僅要有Connection類(lèi),還要被epoll管理。但這樣的代碼并不高效,刪除的時(shí)候要從epoll里刪,還要從connections_里刪,且代碼也不夠簡(jiǎn)潔。
封裝并修改一下形式
class Connection
{
public:
Connection(const int& fd, const std::string& clientip, const uint16_t& clientport)
: fd_(fd), clientip_(clientip), clientport_(clientport)
{}
~Connection()
{}
public:
int fd_;
std::string inbuffer_;
std::string outbuffer_;
std::string clientip_;
uint16_t clientport_;
};
//...
void InitServer()
{
listensock_.Socket();
listensock_.Bind(port_);
listensock_.Listen();
epoller_.Create();
//為listensock創(chuàng)建對(duì)應(yīng)的connection對(duì)象
//將listensock和connection對(duì)象添加到connections_
//將listensock添加到epoll中
AddConnection(listensock_.Fd(), EPOLLIN);
logMessage(Debug, "init server success");
}
void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
{
//1、構(gòu)建connection對(duì)象,交給connections_管理
Connection* conn = new Connection(fd, ip, port);
connections_.insert(std::pair<int, Connection*>(fd, conn));
//2、fd和events寫(xiě)到內(nèi)核中
bool r = epoller_.AddEvent(fd, events);
assert(r);
(void)r;
logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
}
void Accepter()
{
std::string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport);
if (sock < 0) return ;
logMessage(Debug, "%s:%d 已經(jīng)連上服務(wù)器了", clientip.c_str(), clientport);
// 還不能recv,即使有了連接但也不知道有沒(méi)有數(shù)據(jù)
// 只有epoll知道具體情況,所以將sock添加到epoll中
AddConnection(sock, EPOLLIN, clientip, clientport);
}
void Recver(int fd)
{
char request[1024];
ssize_t s = recv(fd, request, sizeof(request) - 1, 0);
if (s > 0)
{
request[s - 1] = 0; // 對(duì)打印格式
request[s - 2] = 0; // 做一下調(diào)整
connections_[fd]->inbuffer_ += request;
std::string response = func_(request);
send(fd, response.c_str(), response.size(), 0);
}
else
{
if (s == 0)
logMessage(Info, "client quit ...");
else
logMessage(Warning, "recv error
// 在處理異常的時(shí)候,fd必須合法才能被處理
epoller_.DelEvent(fd);
}
}
2、打造統(tǒng)一的分開(kāi)處理的體系
現(xiàn)有的Accepter、Recver都是處理寫(xiě)事件的,LoopOnce那里可以加個(gè)讀事件的判斷,但相關(guān)的處理函數(shù)要怎么寫(xiě)?為了簡(jiǎn)便,這里再引入回調(diào)函數(shù)。
const static int gport = 8888;
class Connection;
using func_t = std::function<std::string (std::string)>;
using callback_t = std::function<void(Connection*)>;
class Connection
{
public:
Connection(const int& fd, const std::string& clientip, const uint16_t& clientport)
: fd_(fd), clientip_(clientip), clientport_(clientport)
{}
void Register(callback_t recver, callback_t sender, callback_t excepter)
{
recver_ = recver;
sender_ = sender;
excepter_ = excepter;
}
~Connection()
{}
public:
//IO信息
int fd_;
std::string inbuffer_;
std::string outbuffer_;
//IO處理
callback_t recver_;
callback_t sender_;
callback_t excepter_;
//用戶(hù)信息
std::string clientip_;
uint16_t clientport_;
};
Register為注冊(cè)方法,也就是要使用的方法。在AddConnection函數(shù)中,要判斷一下,是我們關(guān)心的和不是我們關(guān)心的,都調(diào)用注冊(cè)方法,但傳的參數(shù)不一樣。
void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
{
//2、構(gòu)建connection對(duì)象,交給connections_管理
Connection* conn = new Connection(fd, ip, port);
if(fd == listensock_.Fd())
{
conn->Register();
}
else
{
conn->Register();
}
connections_.insert(std::pair<int, Connection*>(fd, conn));
//3、fd和events寫(xiě)到內(nèi)核中
bool r = epoller_.AddEvent(fd, events);
assert(r);
(void)r;
logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
}
Accepter那里,里面有AddConnection函數(shù)。當(dāng)LoopOnce調(diào)用Accepter時(shí),這個(gè)函數(shù)也要用回調(diào)函數(shù),這樣就是一個(gè)類(lèi)的成員函數(shù)要調(diào)用另一個(gè)類(lèi)的回調(diào)函數(shù)。
void Accepter(Connection* conn)
{
(void) conn;//先閑置不用
std::string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport);
if (sock < 0) return ;
logMessage(Debug, "%s:%d 已經(jīng)連上服務(wù)器了", clientip.c_str(), clientport);
// 還不能recv,即使有了連接但也不知道有沒(méi)有數(shù)據(jù)
// 只有epoll知道具體情況,所以將sock添加到epoll中
AddConnection(sock, EPOLLIN, clientip, clientport);
}
AddConnection中,Regsiter三個(gè)參數(shù)都是callback_t類(lèi)型的,我們可以這樣寫(xiě)
if(fd == listensock_.Fd())
{
conn->Register(std::bind(&EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
}
這樣設(shè)置,當(dāng)我們關(guān)心的套接字上有事件就緒時(shí),讀方法就綁定Accepter。是其它套接字的話
else
{
conn->Register(std::bind(&EpollServer::Recver, this, std::placeholders::_1),
std::bind(&EpollServer::Sender, this, std::placeholders::_1),
std::bind(&EpollServer::Excepter, this, std::placeholders::_1));
}
void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
{
//2、構(gòu)建connection對(duì)象,交給connections_管理
Connection* conn = new Connection(fd, ip, port);
if(fd == listensock_.Fd())
{
conn->Register(std::bind(&EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
}
else
{
conn->Register(std::bind(&EpollServer::Recver, this, std::placeholders::_1),
std::bind(&EpollServer::Sender, this, std::placeholders::_1),
std::bind(&EpollServer::Excepter, this, std::placeholders::_1));
}
connections_.insert(std::pair<int, Connection*>(fd, conn));
//3、fd和events寫(xiě)到內(nèi)核中
bool r = epoller_.AddEvent(fd, events);
assert(r);
(void)r;
logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
}
Recver和Sender函數(shù)要傳的參數(shù)都是Connection* conn,Sender和Excepter下面再寫(xiě)。這樣的設(shè)計(jì)主要是為了更集中實(shí)現(xiàn)功能,代碼分明。
在LoopOnce就這樣寫(xiě):
void LoopOnce(int timeout)
{
int n = epoller_.Wait(revs_, gnum, timeout);
for(int i = 0; i < n; i++)
{
int fd = revs_[i].data.fd;
uint32_t events = revs_[i].events;
logMessage(Debug, "當(dāng)前正在處理%d上的%s", fd, (events&EPOLLIN) ? "EPOLLIN" : "OTHER");
if(events & EPOLLIN) connections_[fd]->recver_(connections_[fd]);
if(events & EPOLLOUT) connections_[fd]->sender_(connections_[fd]);
if((events & EPOLLERR) || (events & EPOLLHUP)) connections_[fd]->excepter_(connections_[fd]);
}
}
這樣就形成了一整個(gè)體系。寫(xiě)事件,讀事件,其它事件都有了處理。當(dāng)服務(wù)器啟動(dòng)后,服務(wù)器就監(jiān)聽(tīng)事件,一旦事件就緒,就會(huì)根據(jù)不同的事件類(lèi)型來(lái)派發(fā)事件到不同的Connection中,由Connection來(lái)調(diào)用對(duì)應(yīng)的函數(shù)來(lái)處理。
這時(shí)候,Start函數(shù)就是事件派發(fā)器,可以寫(xiě)為Disptcher()。接下來(lái)要寫(xiě)Recver、Sender、Excepter。
3、epoll工作模式
select,poll,epoll三個(gè),一旦有事件就緒,如果上層不取,底層就會(huì)一直通知事件就緒,這種模式叫做LT模式,水平觸發(fā)Level Triggered工作模式。epoll默認(rèn)LT,另有一個(gè)ET模式,邊緣觸發(fā)Edge Triggered工作模式,在數(shù)據(jù)變化時(shí)只通知一次,變化就是從無(wú)到有,從有到多。ET倒逼程序員必須一次將本輪數(shù)據(jù)全部讀取完畢,怎樣保證讀完?可以循環(huán)讀取,直到某次讀取的數(shù)量比每次要的量少,比如等于0或者小于這個(gè)數(shù),就說(shuō)明讀完了;但因?yàn)閞ecv/read是默認(rèn)阻塞的,所以循環(huán)讀取可能阻塞住,比如讀完幾次后剛好全部讀完,那么下次讀取就阻塞了,所以ET模式下,所有的讀取和寫(xiě)入都必須是非阻塞的接口。
LT也可以在非阻塞的情況寫(xiě)入,讀取,當(dāng)然也可以在阻塞模式下工作。但LT也不能代替ET,因?yàn)榇a無(wú)法統(tǒng)一起來(lái),而ET只能是非阻塞,ET倒逼程序員寫(xiě)成它自己的形式。ET通知效率 >= LT,IO效率也是一樣。
一次通知就是一次系統(tǒng)調(diào)用返回,一次返回必定對(duì)應(yīng)一次調(diào)用,ET能有效減少系統(tǒng)調(diào)用次數(shù)。ET倒逼程序員盡快取走數(shù)據(jù)的本質(zhì)是讓TCP底層更新出更大的接收窗口,以較大概率地增加對(duì)方的滑動(dòng)窗口的大小,提高發(fā)送效率。
ET并非能替代LT,ET適合高IO場(chǎng)景,LT能夠讀一部分就處理一部分,ET必須得讀完才行。epoll接口默認(rèn)LT。
4、ET模式
ET的設(shè)置是一個(gè)宏,EPOLLET。在我們的InitServer初始化函數(shù)中加上這個(gè)宏就行。下面也放了AddConnection的代碼。
void InitServer()
{
listensock_.Socket();
listensock_.Bind(port_);
listensock_.Listen();
epoller_.Create();
//為listensock創(chuàng)建對(duì)應(yīng)的connection對(duì)象
//將listensock和connection對(duì)象添加到connections_
//將listensock添加到epoll中
AddConnection(listensock_.Fd(), EPOLLIN | EPOLLET);
logMessage(Debug, "init server success");
}
void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
{
//2、構(gòu)建connection對(duì)象,交給connections_管理
Connection* conn = new Connection(fd, ip, port);
if(fd == listensock_.Fd())
{
conn->Register(std::bind(EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
}
else
{
conn->Register(std::bind(EpollServer::Recver, this, std::placeholders::_1),
std::bind(EpollServer::Sender, this, std::placeholders::_1),
std::bind(EpollServer::Excepter, this, std::placeholders::_1));
}
connections_.insert(std::pair<int, Connection*>(fd, conn));
//3、fd和events寫(xiě)到內(nèi)核中
bool r = epoller_.AddEvent(fd, events);
assert(r);
(void)r;
logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
}
除此之外,監(jiān)聽(tīng)的套接字也得設(shè)置成非阻塞,用fcntl接口。寫(xiě)一個(gè)Util.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
class Util
{
public:
static bool SetNonBlock(int fd)
{
int fl = fcntl(fd, F_GETFL);
if(fl < 0) return false;
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
return true;
}
};
在EpollServer.hpp中
void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
{
//1、設(shè)置fd非阻塞
if(events & EPOLLET) Util::SetNonBlock(fd);
//2、構(gòu)建connection對(duì)象,交給connections_管理
Connection* conn = new Connection(fd, ip, port);
Accepter里
AddConnection(sock, EPOLLIN | EPOLLET, clientip, clientport);
5、繼續(xù)完善,處理寫(xiě)事件
現(xiàn)在的代碼,從InitServer,傳listensock_.Fd()到AddConnection中,然后會(huì)調(diào)用Accetper函數(shù),但只能讀一個(gè)連接,得改成循環(huán)的。給Connection類(lèi)加一個(gè)成員uint32_t events,AddConnection函數(shù)中在插入connections_數(shù)組前給conn->events賦值,Accepter函數(shù)中傳過(guò)來(lái)的參數(shù)是conn,通過(guò)events來(lái)判斷是否需要循環(huán)。
class Connection
{
public:
Connection(const int& fd, const std::string& clientip, const uint16_t& clientport)
: fd_(fd), clientip_(clientip), clientport_(clientport)
{}
void Register(callback_t recver, callback_t sender, callback_t excepter)
{
recver_ = recver;
sender_ = sender;
excepter_ = excepter;
}
~Connection()
{}
public:
//IO信息
int fd_;
std::string inbuffer_;
std::string outbuffer_;
//IO處理
callback_t recver_;
callback_t sender_;
callback_t excepter_;
//用戶(hù)信息
std::string clientip_;
uint16_t clientport_;
uint32_t events;
};
void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
{
//1、設(shè)置fd非阻塞
if(events & EPOLLET) Util::SetNonBlock(fd);
//2、構(gòu)建connection對(duì)象,交給connections_管理
Connection* conn = new Connection(fd, ip, port);
if(fd == listensock_.Fd())
{
conn->Register(std::bind(EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
}
else
{
conn->Register(std::bind(EpollServer::Recver, this, std::placeholders::_1),
std::bind(EpollServer::Sender, this, std::placeholders::_1),
std::bind(EpollServer::Excepter, this, std::placeholders::_1));
}
conn->events = events;
connections_.insert(std::pair<int, Connection*>(fd, conn));
//3、fd和events寫(xiě)到內(nèi)核中
bool r = epoller_.AddEvent(fd, events);
assert(r);
(void)r;
logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
}
void Accepter(Connection* conn)
{
do
{
std::string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport);
if (sock < 0) return;
logMessage(Debug, "%s:%d 已經(jīng)連上服務(wù)器了", clientip.c_str(), clientport);
// 還不能recv,即使有了連接但也不知道有沒(méi)有數(shù)據(jù)
// 只有epoll知道具體情況,所以將sock添加到epoll中
AddConnection(sock, EPOLLIN | EPOLLET, clientip, clientport);
} while (conn->events & EPOLLET);//如果是ET模式就循環(huán),不是就退出
}
是我們需要的,就走Accetper方法,不是就走Recver,Sender,Excepter方法。上面Accepter函數(shù)中的Accept函數(shù)有返回值,這里的處理就是如果出錯(cuò)就返回,不過(guò)現(xiàn)在得處理出錯(cuò),如果不出錯(cuò)才能繼續(xù)走AddConnection函數(shù)。先在Sock.hpp中改一下,加上err參數(shù),err = errno。
int Accept(std::string* clientip, uint16_t* clientport, int* err)
{
struct sockaddr_in temp;
socklen_t len = sizeof(temp);
int sock = accept(_sock, (struct sockaddr*)&temp, &len);
*err = errno;
if(sock < 0)
{
logMessage(Warning, "accept error, code: %d, errstring: %s", errno, strerror(errno));
}
else
{
*clientip = inet_ntoa(temp.sin_addr);//這個(gè)函數(shù)就可以從結(jié)構(gòu)體中拿出ip地址,轉(zhuǎn)換好后返回
*clientport = ntohs(temp.sin_port);
}
return sock;
}
Accepter函數(shù)
void Accepter(Connection* conn)
{
do
{
int err = 0;
std::string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport, &err);
if (sock < 0)
{
logMessage(Debug, "%s:%d 已經(jīng)連上服務(wù)器了", clientip.c_str(), clientport);
// 還不能recv,即使有了連接但也不知道有沒(méi)有數(shù)據(jù)
// 只有epoll知道具體情況,所以將sock添加到epoll中
AddConnection(sock, EPOLLIN | EPOLLET, clientip, clientport);
}
else
{
if(err == EAGAIN || err == EWOULDBLOCK) break;//讀完了,緩沖區(qū)滿(mǎn)了
else if(err == EINTR) continue;//有信號(hào)暫時(shí)中斷,后續(xù)還得繼續(xù)讀
else//異常,本次獲取連接失敗,繼續(xù)讀下一個(gè)連接
{
logMessage(Warning, "errstring: %s, errcode: %d", strerror(err), err);
continue;
}
}
} while (conn->events & EPOLLET);//如果是ET模式就循環(huán),不是就退出
logMessage(Debug, "accepter done ...");
}
再完成Recver,Sender,Excepter函數(shù)
void Recver(Connection* conn)
{
//讀取完了本輪數(shù)據(jù)
do
{
char buffer[bsize];//1024
ssize_t n = recv(conn->fd_, buffer, sizeof(buffer) - 1, 0);
if(n > 0)
{
buffer[n] = 0;
conn->inbuffer_ += buffer;
//根據(jù)基本協(xié)議,進(jìn)行數(shù)據(jù)分析,邊讀取邊分析
}
else if(n == 0)//另一端關(guān)閉了套接字,要關(guān)閉連接
{
conn->excepter_(conn);//歸到異常處理
}
else
{
if(errno == EAGAIN || errno == EWOULDBLOCK) break;
else if(errno == EINTR) continue;
else conn->excepter_(conn);
}
} while (conn->events & EPOLLET);
//根據(jù)基本協(xié)議,進(jìn)行數(shù)據(jù)分析
}
分析數(shù)據(jù)可以全讀完再分析,也可以邊讀邊分析,這就需要有協(xié)議規(guī)定,協(xié)議在之前有寫(xiě)過(guò)簡(jiǎn)單的代碼?,F(xiàn)在有3種情況會(huì)調(diào)用異常處理函數(shù)Excpter,Recver函數(shù)讀的時(shí)候異常, Sender函數(shù)發(fā)送數(shù)據(jù)出現(xiàn)異常,LoopOnce里也有。這樣情況有些多,代碼寫(xiě)起來(lái)也不夠好。改一下
void LoopOnce(int timeout)
{
int n = epoller_.Wait(revs_, gnum, timeout);
for(int i = 0; i < n; i++)
{
int fd = revs_[i].data.fd;
uint32_t events = revs_[i].events;
logMessage(Debug, "當(dāng)前正在處理%d上的%s", fd, (events&EPOLLIN) ? "EPOLLIN" : "OTHER");
//下面這句就是把所有異常情況都轉(zhuǎn)化為Recver和Sender去調(diào)用異常函數(shù)
if((events & EPOLLERR) || (events & EPOLLHUP)) events |= (EPOLLIN | EPOLLOUT);
//下面這兩個(gè)也要改一下,要保證連接存在
if((events & EPOLLIN) && ConnIsExists(fd))
connections_[fd]->recver_(connections_[fd]);
if((events & EPOLLOUT) && ConnIsExists(fd))
connections_[fd]->sender_(connections_[fd]);
}
}
bool ConnIsExists(int fd)
{
return connections_.find(fd) != connections_.end();
}
6、引入自定義協(xié)議,處理寫(xiě)事件
用之前的Util.hpp和Protocol.hpp,有序列化反序列化,所以Makefile里得加上-ljsoncpp
epollserver:Main.cc
g++ -o $@ $^ -ljsoncpp -std=c++11
.PHONY:clean
clean:
rm -f epollserver
Util.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <string>
#include <vector>
#include <cstdlib>
using namespace std;
class Util
{
public:
static bool SetNonBlock(int fd)
{
int fl = fcntl(fd, F_GETFL);
if(fl < 0) return false;
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
return true;
}
static bool StringSplit(const std::string& str, const std::string& sep, std::vector<std::string>* result)
{
size_t start = 0;
while(start < str.size())
{
auto pos = str.find(sep, start);
if(pos == std::string::npos) break;
result->push_back(str.substr(start, pos - start));
start = pos + sep.size();
}
if(start < str.size()) result->push_back(str.substr(start));
return true;
}
static int toInt(const std::string& s)
{
return atoi(s.c_str());
}
};
Protocol.hpp
#pragma once
#include <iostream>
#include <cstring>
#include <string>
#include <vector>
#include <jsoncpp/json/json.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "Util.hpp"
//#define MESELF 1
//給網(wǎng)絡(luò)版本計(jì)算機(jī)定制協(xié)議
namespace protocol_ns
{
#define SEP " "
#define SEP_LEN strlen(SEP)//不能用sizeof
#define HEADER_SEP "\r\n"
#define HEADER_SEP_LEN strlen("\r\n")
//"長(zhǎng)度"\r\n"_x_op_y"\r\n
//假設(shè)報(bào)文是這樣的: "7"\r\n"10 + 20"\r\n,這就相當(dāng)于報(bào)頭 + 有效載荷
//請(qǐng)求/響應(yīng) = 報(bào)頭\r\n有效載荷\r\n,只是請(qǐng)求和響應(yīng)的有效載荷不同
std::string AddHeader(const std::string &str)
{
std::cout << "AddHeader 之前:\n"
<< str << std::endl;
std::string s = std::to_string(str.size());
s += HEADER_SEP;
s += str;
s += HEADER_SEP;
std::cout << "AddHeader 之hou:\n"
<< s << std::endl;
return s;
}
std::string RemoveHeader(const std::string& str, int len)
{
std::cout << "RemoveHeader 之前:\n"
<< str << std::endl;
std::string res = str.substr(str.size() - HEADER_SEP_LEN - len, len);
std::cout << "RemoveHeader 之后:\n"
<< str << std::endl;
return res;
}
int ReadPackage(int sock, std::string& inbuffer, std::string* package)
{
std::cout << "ReadPackage inbuffer 之前:\n"
<< inbuffer << std::endl;
//讀取 ———— 字符串"7"\r\n"10 + 20"\r\n
char buffer[1024];
ssize_t s = recv(sock, buffer, sizeof(buffer - 1), 0);
if(s <= 0) return -1;
buffer[s] = 0;
inbuffer += buffer;//此時(shí)inbuffer里就有了這樣的字符串: "7"\r\n"10 + 20"\r\n
//分析
auto pos = inbuffer.find(HEADER_SEP);
if(pos == std::string::npos) return 0;//沒(méi)找到\r\n那么就不是正確的字符串,不動(dòng)inbuffer里的內(nèi)容,退出
std::string lenStr = inbuffer.substr(0, pos);//獲取頭部字符串7
int len = Util::toInt(lenStr);//得到了長(zhǎng)度7,也就是有效載荷長(zhǎng)度
int targetPackagelen = lenStr.size() + len + 2 * HEADER_SEP_LEN;//接收到的有報(bào)文的字符串長(zhǎng)度就是這個(gè)
if(inbuffer.size() < targetPackagelen) return 0;
//提取報(bào)文有效載荷
*package = inbuffer.substr(0, targetPackagelen);//package保存了"7"\r\n"10 + 20"\r\n,去掉其它符號(hào)的工作交給RemoveHeader
inbuffer.erase(0, targetPackagelen);//只有到這里才改變inbuffer里的內(nèi)容,從inbuffer里直接移除整個(gè)報(bào)文
std::cout << "ReadPackage inbuffer 之后:\n"
<< inbuffer << std::endl;
return len;//len就是有效載荷的長(zhǎng)度
}
class Request
{
public:
Request() {}//為無(wú)參構(gòu)造而準(zhǔn)備的,這樣就是一個(gè)無(wú)參一個(gè)有參
Request(int x, int y, char op)
: _x(x), _y(y), _op(op)
{}
bool Serialize(std::string* outstr)//序列化:結(jié)構(gòu)體轉(zhuǎn)字符串
{
*outstr = "";
#ifdef MYSELF
std::string x_string = std::to_string(_x);
std::string y_string = std::to_string(_y);
// 手動(dòng)序列化
*outstr = x_string + SEP + _op + SEP + y_string;
#else
Json::Value root;//Value是一個(gè)萬(wàn)能對(duì)象,接受任何一個(gè)kv類(lèi)型
root["x"] = _x;
root["y"] = _y;//所有放進(jìn)去的會(huì)自動(dòng)轉(zhuǎn)為string類(lèi)型
root["op"] = _op;
//Json::FastWriter writer;//FastWriter用來(lái)序列化,把結(jié)構(gòu)化的數(shù)據(jù)轉(zhuǎn)為字符串類(lèi)型
Json::StyledWriter writer;
*outstr = writer.write(root);
#endif
return true;
}
bool Deserialize(const std::string& instr)//反序列化:字符串轉(zhuǎn)結(jié)構(gòu)體
{
#ifdef MYSELF
std::vector<std::string> result;
Util::StringSplit(instr, SEP, &result);
if (result.size() != 3)
return false;
_x = Util::toInt(result[0]);
_y = Util::toInt(result[2]);
if (result[1].size() == 1)
return false; // 協(xié)議規(guī)定
_op = result[1][0]; // 因?yàn)槭亲址?,所以只要一個(gè)符號(hào)即可
std::cout << "_x: \n"
<< _x << "_y: \n"
<< _y << "_op: " << _op << std::endl;
#else
Json::Value root;
Json::Reader reader;//Reader用來(lái)反序列化
reader.parse(instr, root);
_x = root["x"].asInt();//拿到的是字符串,要轉(zhuǎn)成int類(lèi)型
_y = root["y"].asInt();
//_op雖然是char,但它在計(jì)算機(jī)里就是整數(shù),序列化時(shí)放進(jìn)root的就是整數(shù)類(lèi)型,反序列化時(shí)轉(zhuǎn)成int類(lèi)型,然后編譯器會(huì)根據(jù)char類(lèi)型自動(dòng)解釋成char類(lèi)型
_op = root["op"].asInt();
#endif
return true;
}
~Request() {}
public:
int _x;
int _y;
char _op;
};
class Response
{
public:
Response() {}
Response(int result, int code)
: _result(result), _code(code)
{}
bool Serialize(std::string* outstr)
{
*outstr = "";
#ifdef MYSELF
std::string res_string = std::to_string(_result);
std::string code_string = std::to_string(_code);
// 手動(dòng)序列化
*outstr = res_string + SEP + code_string;
#else
Json::Value root;
root["result"] = _result;
root["code"] = _code;
//Json::FastWriter writer;
Json::StyledWriter writer;
*outstr = writer.write(root);
#endif
return true;
}
bool Deserialize(const std::string& instr)
{
#ifdef MYSELF
std::vector<std::string> result;
Util::StringSplit(instr, SEP, &result);
if (result.size() != 2)
return false;
_result = Util::toInt(result[0]);
_code = Util::toInt(result[1]);
std::cout << "_result: \n"
<< _result << "_code: " << _code << std::endl;
#else
Json::Value root;
Json::Reader reader;
reader.parse(instr, root);
_result = root["result"].asInt();
_code = root["code"].asInt();
#endif
return true;
}
~Response() {}
public:
int _result;
int _code;//0表示計(jì)算成功,剩余的數(shù)字就是各種非法操作的錯(cuò)誤碼
};
}
ReadPackage改一下,之前是接收并分析,現(xiàn)在只做分析
int ParsePackage(std::string& inbuffer, std::string* package)
{
std::cout << "ReadPackage inbuffer 之前:\n"
<< inbuffer << std::endl;
//分析
auto pos = inbuffer.find(HEADER_SEP);
if(pos == std::string::npos) return 0;//沒(méi)找到\r\n那么就不是正確的字符串,不動(dòng)inbuffer里的內(nèi)容,退出
std::string lenStr = inbuffer.substr(0, pos);//獲取頭部字符串7
int len = Util::toInt(lenStr);//得到了長(zhǎng)度7,也就是有效載荷長(zhǎng)度
int targetPackagelen = lenStr.size() + len + 2 * HEADER_SEP_LEN;//接收到的有報(bào)文的字符串長(zhǎng)度就是這個(gè)
if(inbuffer.size() < targetPackagelen) return 0;
//提取報(bào)文有效載荷
*package = inbuffer.substr(0, targetPackagelen);//package保存了"7"\r\n"10 + 20"\r\n,去掉其它符號(hào)的工作交給RemoveHeader
inbuffer.erase(0, targetPackagelen);//只有到這里才改變inbuffer里的內(nèi)容,從inbuffer里直接移除整個(gè)報(bào)文
std::cout << "ReadPackage inbuffer 之后:\n"
<< inbuffer << std::endl;
return len;//len就是有效載荷的長(zhǎng)度
}
繼續(xù)寫(xiě)EpollServer.hpp中的Recver和Sender函數(shù)
void Recver(Connection* conn)
{
//讀取完了本輪數(shù)據(jù)
do
{
char buffer[bsize];//1024
ssize_t n = recv(conn->fd_, buffer, sizeof(buffer) - 1, 0);
if(n > 0)
{
buffer[n] = 0;
conn->inbuffer_ += buffer;
//根據(jù)基本協(xié)議,進(jìn)行數(shù)據(jù)分析,邊讀取邊分析;
std::string requestStr;
int n = ParsePackage(conn->inbuffer_, &requestStr);
//看ParsePackage
//n為0表示沒(méi)有不合理字符串或者inbuffer剩下的不夠規(guī)定的長(zhǎng)度,不用判斷
if(n > 0)//保證讀到了完整的請(qǐng)求
{
//回調(diào)函數(shù)在Main.cc中
//這邊先反序列化,再交給回調(diào)函數(shù)
//上面改成using func_t = std::function<void(const Request&)>;
requestStr = RemoveHeader(requestStr, n);
Request req;
req.Deserialize(requestStr);
func_(req);//交給回調(diào)函數(shù)處理
}
}
else if(n == 0)//另一端關(guān)閉了套接字,要關(guān)閉連接
{
conn->excepter_(conn);//歸到異常處理
}
else
{
if(errno == EAGAIN || errno == EWOULDBLOCK) break;
else if(errno == EINTR) continue;
else conn->excepter_(conn);
}
} while (conn->events & EPOLLET);
}
更改在n > 0的判斷后
Main.cc
#include "EpollServer.hpp"
#include <memory>
//用之前網(wǎng)絡(luò)計(jì)算器的計(jì)算函數(shù)
Response CalculateHelper(const Request& req)
{
Response resp(0, 0);
switch(req._op)
{
case '+':
resp._result = req._x + req._y;
break;
case '-':
resp._result = req._x - req._y;
break;
case '*':
resp._result = req._x * req._y;
break;
case '/':
if(req._y == 0) resp._code = 1;
else resp._result = req._x / req._y;
break;
case '%':
if(req._y == 0) resp._code = 2;
else resp._result = req._x / req._y;
break;
default:
resp._code = 3;
break;
}
return resp;
}
void Calculate(const Request& req)
{
Response resp = CalculateHelper(req);
//序列化,當(dāng)然這里放到EpollServer.hpp更好
std::string sendStr;
resp.Serialize(&sendStr);
sendStr = AddHeader(sendStr);
//序列化后發(fā)送出去
}
int main()
{
std::unique_ptr<EpollServer> svr(new EpollServer(Calculate));
svr->InitServer();
svr->Disptcher();
return 0;
}
epoll中關(guān)于fd的讀取,一般要常設(shè)置,也就是一直要讓epoll關(guān)心;關(guān)于fd的寫(xiě)入,則是按需設(shè)置,不能常設(shè)置,只有需要發(fā)送的時(shí)候才設(shè)置。發(fā)送的對(duì)象就是Connection中的outbuffer。
//EpollServer.hpp
using func_t = std::function<void(Connection *, const Request&)>;
//...
func_(conn, req);//Recver函數(shù)中
//Main.cc
void Calculate(Connection* conn, const Request& req)
{
Response resp = CalculateHelper(req);
//序列化,當(dāng)然這里放到EpollServer.hpp更好
std::string sendStr;
resp.Serialize(&sendStr);
//序列化后發(fā)送出去
conn->outbuffer_ += sendStr;
//開(kāi)啟對(duì)寫(xiě)事件的關(guān)心
}
加上寫(xiě)事件是要對(duì)內(nèi)核做操作,在EpollServer.hpp的EpollServer類(lèi)中中再添加一個(gè)函數(shù)專(zhuān)門(mén)做這個(gè)事,不過(guò)Main.cc中傳過(guò)來(lái)的參數(shù)只有Connection類(lèi)的,所有這個(gè)類(lèi)還得添加一個(gè)成員來(lái)調(diào)用這個(gè)函數(shù)
class EpollServer;
//...
EpollServer* R;
//AddConnection里conn->events = events后
conn->R = this;//函數(shù)屬于EpollServer類(lèi),this就是這個(gè)類(lèi)
//...
void Calculate(Connection* conn, const Request& req)
{
Response resp = CalculateHelper(req);
//序列化,當(dāng)然這里放到EpollServer.hpp更好
std::string sendStr;
resp.Serialize(&sendStr);
//序列化后發(fā)送出去
conn->outbuffer_ += sendStr;
//開(kāi)啟對(duì)寫(xiě)事件的關(guān)心
conn->R->EnableReadWrite(conn, true, true);
}
開(kāi)啟后,Epoll底層會(huì)調(diào)用Sender函數(shù)來(lái)發(fā)送。
void Sender(Connection* conn)
{
do
{
ssize_t n = send(conn->fd_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);
//體現(xiàn)按需思路
if(n > 0)//發(fā)送成功,發(fā)送了局部或全部
{
conn->outbuffer_.erase(0, n);
if(conn->outbuffer_.empty())//把數(shù)據(jù)發(fā)完了
{
EnableReadWrite(conn, true, false);//去掉對(duì)寫(xiě)事件的關(guān)心
break;
}
else//沒(méi)發(fā)完,也就是發(fā)送了局部
{
EnableReadWrite(conn, true, true);//繼續(xù)
}
}
else
{
//和Accepter里的一樣的解釋
if(errno == EAGAIN || errno == EWOULDBLOCK) break;
else if(errno == EINTR) continue;
else
{
conn->excepter_(conn);
break;
}
}
} while (conn->events & EPOLLET);//ET模式就一直循環(huán),不是就退出
}
通常初次設(shè)置對(duì)寫(xiě)事件的關(guān)心,發(fā)送緩沖區(qū)是空的會(huì),因此立馬觸發(fā)一次對(duì)應(yīng)的fd的就緒,此時(shí)epoll底層會(huì)自動(dòng)調(diào)用回調(diào)函數(shù),從而使用Sender函數(shù)。
實(shí)現(xiàn)EnableReadWrite函數(shù)
bool EnableReadWrite(Connection* conn, bool readable, bool writeable)
{
conn->events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
//修改Epoll.hpp中的AddEvent函數(shù)為AddModEvent,傳入一個(gè)op,用來(lái)實(shí)現(xiàn)Add和Mod兩個(gè)功能
return epoller_.AddModEvent(conn->fd_, conn->events, EPOLL_CTL_MOD);
}
bool AddModEvent(int fd, uint32_t events, int op)
{
struct epoll_event ev;
ev.events = events;
ev.data.fd = fd;//屬于用戶(hù)的數(shù)據(jù),epoll底層不對(duì)該數(shù)據(jù)做任何修改,為了給未來(lái)就緒返回
int n = epoll_ctl(epfd_, op, fd, &ev);
if(n < 0)
{
logMessage(Fatal, "epoll_ctl error, code: %d, errstring: %s", errno, strerror(errno));
return false;
}
return true;
}
這樣就完成了服務(wù)器的拉取工作,也就是有數(shù)據(jù)來(lái)了,可以返回結(jié)果。
現(xiàn)在這樣的代碼是沒(méi)問(wèn)題的,但也有些復(fù)雜,我們希望暴露在外面的更少,所有工作都在底層完成,上層不需要關(guān)心,只調(diào)用接口就好。
本篇gitee
下一篇繼續(xù)寫(xiě)。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-808020.html
結(jié)束。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-808020.html
到了這里,關(guān)于Linux學(xué)習(xí)記錄——??? 高級(jí)IO(5)--- Epoll型服務(wù)器(2)(Reactor)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!