目錄
1.Reactor模式
1.1.Reactor模式的定義
1.2.Reactor模式的角色構(gòu)成
1.3.Reactor模式的工作流程
2.epoll?ET服務(wù)器(Reactor模式)
2.1.epoll?ET服務(wù)器源代碼
2.2.epoll?ET服務(wù)器源代碼講解
2.2.1.設(shè)計(jì)思路
2.2.2.Connection結(jié)構(gòu)
2.2.3.TcpServer類
2.2.4.回調(diào)函數(shù)
2.2.5.套接字相關(guān)
2.2.6.Reactor模式和Proactor模式
1.Reactor模式
1.1.Reactor模式的定義
Reactor反應(yīng)器模式,也叫做分發(fā)者模式或通知者模式,是一種將就緒事件派發(fā)給對(duì)應(yīng)服務(wù)處理程序的事件設(shè)計(jì)模式。
1.2.Reactor模式的角色構(gòu)成
Reactor主要由以下五個(gè)角色構(gòu)成:
角色 解釋 Handle(句柄) 用于標(biāo)識(shí)不同的事件,本質(zhì)就是一個(gè)文件描述符。 Sychronous Event Demultiplexer(同步事件分離器) 本質(zhì)就是一個(gè)系統(tǒng)調(diào)用,用于等待事件的發(fā)生。對(duì)于Linux來(lái)說(shuō),同步事件分離器指的就是I/O多路復(fù)用,比如select、poll、epoll等。 Event Handler(事件處理器) 由多個(gè)回調(diào)方法構(gòu)成,這些回調(diào)方法構(gòu)成了與應(yīng)用相關(guān)的對(duì)于某個(gè)事件的處理反饋。 Concrete Event Handler(具體事件處理器) 事件處理器中各個(gè)回調(diào)方法的具體實(shí)現(xiàn)。 Initiation Dispatcher(初始分發(fā)器) 初始分發(fā)器實(shí)際上就是Reactor角色,初始分發(fā)器會(huì)通過(guò)同步事件分離器來(lái)等待事件的發(fā)生,當(dāng)對(duì)應(yīng)事件就緒時(shí)就調(diào)用事件處理器,最后調(diào)用對(duì)應(yīng)的回調(diào)方法來(lái)處理這個(gè)事件。
1.3.Reactor模式的工作流程
Reactor模式的工作流程如下:
1.當(dāng)應(yīng)用向初始分發(fā)器注冊(cè)具體事件處理器時(shí),應(yīng)用會(huì)標(biāo)識(shí)出該事件處理器希望初始分發(fā)器在某個(gè)事件發(fā)生時(shí)向其通知,該事件與Handle關(guān)聯(lián)。
2.初始分發(fā)器會(huì)要求每個(gè)事件處理器向其傳遞內(nèi)部的Handle,該Handle向操作系統(tǒng)標(biāo)識(shí)了事件處理器。
3.當(dāng)所有的事件處理器注冊(cè)完畢后,應(yīng)用會(huì)啟動(dòng)初始分發(fā)器的事件循環(huán),這時(shí)初始分發(fā)器會(huì)將每個(gè)事件處理器的Handle合并起來(lái),并使用同步事件分離器等待這些事件的發(fā)生。
4.當(dāng)某個(gè)事件處理器的Handle變?yōu)镽eady狀態(tài)時(shí),同步事件分離器會(huì)通知初始分發(fā)器。
5.初始分發(fā)器會(huì)將Ready狀態(tài)的Handle作為key,來(lái)尋找其對(duì)應(yīng)的事件處理器。
6.初始分發(fā)器會(huì)調(diào)用其對(duì)應(yīng)事件處理器當(dāng)中對(duì)應(yīng)的回調(diào)方法來(lái)響應(yīng)該事件。
2.epoll?ET服務(wù)器(Reactor模式)
2.1.epoll?ET服務(wù)器源代碼
創(chuàng)建TcpServer.hpp文件,寫入下圖一所示的代碼,創(chuàng)建Epoller.hpp文件,寫入下圖二所示的代碼,創(chuàng)建Log.hpp文件,寫入下圖三所示的代碼,創(chuàng)建Protocol.hpp文件,寫入下圖四所示的代碼,創(chuàng)建Service.hpp文件,寫入下圖五所示的代碼,創(chuàng)建Sock.hpp文件,寫入下圖六所示的代碼,創(chuàng)建Util.hpp文件,寫入下圖七所示的代碼,創(chuàng)建main.cc文件,寫入下圖八所示的代碼,創(chuàng)建Makefile文件,寫入下圖九所示的代碼,使用make命令生成server可執(zhí)行程序,使用./server 8080命令運(yùn)行server可執(zhí)行程序,創(chuàng)建兩個(gè)新選項(xiàng)卡作為客戶端,分別使用telnet 127.0.0.1 8080命令連接服務(wù)端,輸入ctrl+]進(jìn)入telnet行,然后回車并輸入消息內(nèi)容發(fā)送給服務(wù)端,服務(wù)端收到消息后進(jìn)行計(jì)算并將計(jì)算結(jié)果返回給客戶端,客戶端發(fā)送完消息后輸入ctrl+]進(jìn)入telnet行,然后輸入quit退出,如下圖十所示。
TcpServer.hpp文件:
#pragma once #include <iostream> #include <string> #include <vector> #include <cerrno> #include <unordered_map> #include <functional> #include "Sock.hpp" #include "Epoller.hpp" #include "Log.hpp" #include "Util.hpp" #include "Protocol.hpp" // 基于Reactor模式,編寫一個(gè)充分讀取和寫入的,EPOLL(ET)的Server class Connection; class TcpServer; using func_t = std::function<int(Connection *)>; using callback_t = std::function<int(Connection *, std::string &)>; // event class Connection { public: // 文件描述符 int sock_; TcpServer *R_; // 自己的接受和發(fā)送緩沖區(qū) std::string inbuffer_; std::string outbuffer_; // 回調(diào)函數(shù) func_t recver_; func_t sender_; func_t excepter_; public: Connection(int sock, TcpServer *r) : sock_(sock), R_(r) { } void SetRecver(func_t recver) { recver_ = recver; } void SetSender(func_t sender) { sender_ = sender; } void SetExcepter(func_t excepter) { excepter_ = excepter; } ~Connection() {} }; //Reactor //單進(jìn)程:半異步半同步 -- Reactor -- Linux服務(wù)最常用 -- 幾乎沒(méi)有之一 // Reactor(tcp)服務(wù)器, 即負(fù)責(zé)事件派發(fā), 又負(fù)責(zé)IO [又負(fù)責(zé)業(yè)務(wù)邏輯的處理] // Proactor: 前攝模式 -- 其他平臺(tái)可能出現(xiàn)的模式 // 只負(fù)責(zé)負(fù)責(zé)事件派發(fā),就緒的事件推送給后端的進(jìn)程、線程池, 不關(guān)心 負(fù)責(zé)IO [又負(fù)責(zé)業(yè)務(wù)邏輯的處理] class TcpServer { public: TcpServer(callback_t cb, int port = 8080) : cb_(cb) { revs_ = new struct epoll_event[revs_num]; // 網(wǎng)絡(luò)功能 listensock_ = Sock::Socket(); Util::SetNonBlock(listensock_); Sock::Bind(listensock_, port); Sock::Listen(listensock_); // 多路轉(zhuǎn)接 epfd_ = Epoller::CreateEpoller(); // 添加listensock匹配的connection AddConnection(listensock_, EPOLLIN | EPOLLET, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr); } void AddConnection(int sockfd, uint32_t event, func_t recver, func_t sender, func_t excepter) { if (event & EPOLLET) Util::SetNonBlock(sockfd); // 添加sockfd到epoll Epoller::AddEvent(epfd_, sockfd, event); // 將sockfd匹配的Connection也添加到當(dāng)前的unordered_map中 Connection *conn = new Connection(sockfd, this); conn->SetRecver(recver); conn->SetSender(sender); conn->SetExcepter(excepter); connections_.insert(std::make_pair(sockfd, conn)); logMessage(DEBUG, "添加新鏈接到connections成功: %d", sockfd); } int Accepter(Connection *conn) { // demo - listensock 也是工作在ET,來(lái)一個(gè)連接,對(duì)應(yīng)就有事件就緒,那么如何來(lái)一批呢? while (true) { std::string clientip; uint16_t clientport = 0; int sockfd = Sock::Accept(conn->sock_, &clientip, &clientport); if (sockfd < 0) { if (errno == EINTR) continue; else if (errno == EAGAIN || errno == EWOULDBLOCK) break; else { logMessage(WARNING, "accept error"); return -1; } } logMessage(DEBUG, "get a new link: %d", sockfd); // 注意:默認(rèn)我們只設(shè)置了讓epoll幫我們關(guān)心讀事件,沒(méi)有關(guān)心寫事件 // 為什么沒(méi)有關(guān)注寫事件:因?yàn)樽铋_始的時(shí)候,寫空間一定是就緒的! // 運(yùn)行中可能會(huì)存在條件不滿足 -- 寫空間被寫滿了 AddConnection(sockfd, EPOLLIN | EPOLLET, std::bind(&TcpServer::TcpRecver, this, std::placeholders::_1), std::bind(&TcpServer::TcpSender, this, std::placeholders::_1), std::bind(&TcpServer::TcpExcepter, this, std::placeholders::_1)); } return 0; } int TcpRecver(Connection *conn) { // XXXXXXX\3XXXXXX\3 while (true) { char buffer[1024]; ssize_t s = recv(conn->sock_, buffer, sizeof(buffer) - 1, 0); if (s > 0) { buffer[s] = 0; conn->inbuffer_ += buffer; } else if (s == 0) { logMessage(DEBUG, "client quit"); conn->excepter_(conn); break; } else { if (errno == EINTR) continue; else if (errno == EAGAIN || errno == EWOULDBLOCK) break; else { // 出錯(cuò)了 logMessage(DEBUG, "recv error: %d:%s", errno, strerror(errno)); conn->excepter_(conn); break; } } } // 將本輪全部讀取完畢 std::vector<std::string> result; PackageSplit(conn->inbuffer_, &result); for (auto &message : result) { cb_(conn, message); } return 0; } int TcpSender(Connection *conn) { while(true) { ssize_t n = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0); if(n > 0) { // 去除已經(jīng)成功發(fā)送的數(shù)據(jù) conn->outbuffer_.erase(0, n); } else { if(errno == EINTR) continue; else if(errno == EAGAIN || errno == EWOULDBLOCK) break; //發(fā)完了,一定是outbuffer清空了嗎?不一定(EPOLLOUT打開) else { conn->excepter_(conn); logMessage(DEBUG, "send error: %d:%s", errno, strerror(errno)); break; } } } return 0; } int TcpExcepter(Connection *conn) { // 0. if(!IsExists(conn->sock_)) return -1; // 所有的服務(wù)器異常,都會(huì)被歸類到這里 // 坑:一定要先從epoll中移除,然后再關(guān)閉fd // 1. Epoller::DelEvent(epfd_, conn->sock_); logMessage(DEBUG, "remove epoll event!"); // 2. close(conn->sock_); logMessage(DEBUG, "close fd: %d", conn->sock_); // 3. delete conn; delete connections_[conn->sock_]; logMessage(DEBUG, "delete connection object done"); // 4. connections_.erase(conn->sock_); logMessage(DEBUG, "erase connection from connections"); return 0; } bool IsExists(int sock) { auto iter = connections_.find(sock); if (iter == connections_.end()) return false; else return true; } // 打開或者關(guān)閉對(duì)于特定socket是否要關(guān)心讀或者寫 //EnableReadWrite(sock, true, false); //EnableReadWrite(sock, true, true); void EnableReadWrite(int sock, bool readable, bool writeable) { uint32_t event = 0; event |= (readable ? EPOLLIN : 0); event |= (writeable ? EPOLLOUT : 0); Epoller::ModEvent(epfd_, sock, event); } // 根據(jù)就緒事件,將事件進(jìn)行事件派發(fā) void Dispatcher() { int n = Epoller::LoopOnce(epfd_, revs_, revs_num); for (int i = 0; i < n; i++) { int sock = revs_[i].data.fd; uint32_t revent = revs_[i].events; if(revent & EPOLLHUP) revent |= (EPOLLIN|EPOLLOUT); if(revent & EPOLLERR) revent |= (EPOLLIN|EPOLLOUT); if (revent & EPOLLIN) { if (IsExists(sock) && connections_[sock]->recver_) connections_[sock]->recver_(connections_[sock]); } if (revent & EPOLLOUT) { if (IsExists(sock) && connections_[sock]->sender_) connections_[sock]->sender_(connections_[sock]); } } } void Run() { while (true) { Dispatcher(); } } ~TcpServer() { if (listensock_ != -1) close(listensock_); if (epfd_ != -1) close(epfd_); delete[] revs_; } private: static const int revs_num = 64; // 1. 網(wǎng)絡(luò)socket int listensock_; // 2. epoll int epfd_; // 3. 將epoll和上層代碼進(jìn)行結(jié)合 std::unordered_map<int, Connection *> connections_; // 4. 就緒事件列表 struct epoll_event *revs_; // 5. 設(shè)置完整報(bào)文的處理方法 callback_t cb_; };
Epoller.hpp文件:
#pragma once #include <iostream> #include <cerrno> #include <cstdlib> #include <unistd.h> #include <sys/epoll.h> #include "Log.hpp" class Epoller { public: static const int gsize = 128; public: static int CreateEpoller() { int epfd = epoll_create(gsize); if (epfd < 0) { logMessage(FATAL, "epoll_create : %d : %s", errno, strerror(errno)); exit(3); } return epfd; } static bool AddEvent(int epfd, int sock, uint32_t event) { struct epoll_event ev; ev.events = event; ev.data.fd = sock; int n = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev); return n == 0; } static bool ModEvent(int epfd, int sock, uint32_t event) { struct epoll_event ev; ev.events = event; ev.data.fd = sock; int n = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ev); return n == 0; } static bool DelEvent(int epfd, int sock) { int n = epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr); return n == 0; } static int LoopOnce(int epfd, struct epoll_event revs[], int num) { int n = epoll_wait(epfd, revs, num, -1); if(n == -1) { logMessage(FATAL, "epoll_wait : %d : %s", errno, strerror(errno)); } return n; } };
Log.hpp文件:
#pragma once #include <cstdio> #include <ctime> #include <cstdarg> #include <cassert> #include <cassert> #include <cstring> #include <cerrno> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #define DEBUG 0 #define NOTICE 1 #define WARNING 2 #define FATAL 3 const char *log_level[] = {"DEBUG", "NOTICE", "WARINING", "FATAL"}; #define LOGFILE "serverTcp.log" class Log { public: Log():logFd(-1) {} void enable() { umask(0); logFd = open(LOGFILE, O_WRONLY | O_CREAT | O_APPEND, 0666); assert(logFd != -1); dup2(logFd, 1); dup2(logFd, 2); } ~Log() { if(logFd != -1) { fsync(logFd); close(logFd); } } private: int logFd; }; // logMessage(DEBUG, "%d", 10); void logMessage(int level, const char *format, ...) { assert(level >= DEBUG); assert(level <= FATAL); char *name = getenv("USER"); char logInfo[1024]; va_list ap; // ap -> char* va_start(ap, format); vsnprintf(logInfo, sizeof(logInfo) - 1, format, ap); va_end(ap); // ap = NULL // 每次打開太麻煩 FILE *out = (level == FATAL) ? stderr : stdout; fprintf(out, "%s | %u | %s | %s\n", log_level[level], (unsigned int)time(nullptr), name == nullptr ? "unknow" : name, logInfo); fflush(out); // 將C緩沖區(qū)中的數(shù)據(jù)刷新到OS fsync(fileno(out)); // 將OS中的數(shù)據(jù)盡快刷盤 }
Protocol.hpp文件:
#pragma once #include <iostream> #include <vector> #include <cstring> #include <string> #include <cstdio> #define SEP 'X' #define SEP_LEN sizeof(SEP) #define CRLF "\r\n" #define CRLF_LEN strlen(CRLF) // 坑:sizeof(CRLF) #define SPACE " " #define SPACE_LEN strlen(SPACE) // bbbXcc void PackageSplit(std::string &inbuffer, std::vector<std::string> *result) { while (true) { std::size_t pos = inbuffer.find(SEP); if (pos == std::string::npos) break; result->push_back(inbuffer.substr(0, pos)); inbuffer.erase(0, pos + SEP_LEN); } } struct Request { int x; int y; char op; }; struct Response { int code; int result; }; bool Parser(std::string &in, Request *req) { // 1 + 1, 2 * 4, 5 * 9, 6 *1 std::size_t spaceOne = in.find(SPACE); if (std::string::npos == spaceOne) return false; std::size_t spaceTwo = in.rfind(SPACE); if (std::string::npos == spaceTwo) return false; std::string dataOne = in.substr(0, spaceOne); std::string dataTwo = in.substr(spaceTwo + SPACE_LEN); std::string oper = in.substr(spaceOne + SPACE_LEN, spaceTwo - (spaceOne + SPACE_LEN)); if (oper.size() != 1) return false; // 轉(zhuǎn)成內(nèi)部成員 req->x = atoi(dataOne.c_str()); req->y = atoi(dataTwo.c_str()); req->op = oper[0]; return true; } void Serialize(const Response &resp, std::string *out) { // "exitCode_ result_" std::string ec = std::to_string(resp.code); std::string res = std::to_string(resp.result); *out = ec; *out += SPACE; *out += res; *out += CRLF; }
Service.hpp文件:
#pragma once #include "Protocol.hpp" #include <functional> using service_t = std::function<Response (const Request &req)>; static Response calculator(const Request &req) { Response resp = {0, 0}; switch (req.op) { case '+': resp.result = req.x + req.y; break; case '-': resp.result = req.x - req.y; break; case '*': resp.result = req.x * req.y; break; case '/': { // x_ / y_ if (req.y == 0) resp.code = -1; // -1. 除0 else resp.result = req.x / req.y; } break; case '%': { // x_ / y_ if (req.y == 0) resp.code = -2; // -2. 模0 else resp.result = req.x % req.y; } break; default: resp.code = -3; // -3: 非法操作符 break; } return resp; }
Sock.hpp文件:
#pragma once #include <iostream> #include <fstream> #include <string> #include <vector> #include <cstdio> #include <cstring> #include <signal.h> #include <unistd.h> #include <sys/socket.h> #include <sys/stat.h> #include <arpa/inet.h> #include <netinet/in.h> #include <sys/types.h> #include <sys/wait.h> #include <pthread.h> #include <cerrno> #include <cassert> class Sock { public: static const int gbacklog = 20; static int Socket() { int listenSock = socket(PF_INET, SOCK_STREAM, 0); if (listenSock < 0) { exit(1); } int opt = 1; setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); return listenSock; } static void Bind(int socket, uint16_t port) { struct sockaddr_in local; // 用戶棧 memset(&local, 0, sizeof local); local.sin_family = PF_INET; local.sin_port = htons(port); local.sin_addr.s_addr = INADDR_ANY; // 2.2 本地socket信息,寫入sock_對(duì)應(yīng)的內(nèi)核區(qū)域 if (bind(socket, (const struct sockaddr *)&local, sizeof local) < 0) { exit(2); } } static void Listen(int socket) { if (listen(socket, gbacklog) < 0) { exit(3); } } static int Accept(int socket, std::string *clientip, uint16_t *clientport) { struct sockaddr_in peer; socklen_t len = sizeof(peer); int serviceSock = accept(socket, (struct sockaddr *)&peer, &len); if (serviceSock < 0) { // 獲取鏈接失敗 return -1; } if(clientport) *clientport = ntohs(peer.sin_port); if(clientip) *clientip = inet_ntoa(peer.sin_addr); return serviceSock; } };
Util.hpp文件:
#pragma once #include <iostream> #include <string> #include <unistd.h> #include <fcntl.h> class Util { public: static void SetNonBlock(int fd) { int fl = fcntl(fd, F_GETFL); fcntl(fd, F_SETFL, fl | O_NONBLOCK); } };
main.cc文件:
#include "TcpServer.hpp" #include "Service.hpp" #include <memory> using namespace std; static void usage(std::string process) { cerr << "\nUsage: " << process << " port\n" << endl; } int BeginHandler(Connection *conn, std::string &message, service_t service) { // 我們能保證,message一定是一個(gè)完整的報(bào)文,因?yàn)槲覀円呀?jīng)對(duì)它進(jìn)行了解碼 Request req; // 反序列化,進(jìn)行處理的問(wèn)題 if (!Parser(message, &req)) { // 寫回錯(cuò)誤消息 return -1; // 可以直接關(guān)閉連接 // conn->excepter_(conn); } // 業(yè)務(wù)邏輯 Response resp = service(req); std::cout << req.x << " " << req.op << " " << req.y << std::endl; std::cout << resp.code << " " << resp.result << std::endl; // 序列化 std::string sendstr; Serialize(resp, &sendstr); // 處理完畢的結(jié)果,發(fā)送回給client conn->outbuffer_ += sendstr; conn->sender_(conn); if(conn->outbuffer_.empty()) conn->R_->EnableReadWrite(conn->sock_, true, false); else conn->R_->EnableReadWrite(conn->sock_, true, true); std::cout << "這里就是上次的業(yè)務(wù)邏輯啦 --- end" << std::endl; return 0; } // 1 + 1X2 + 3X5 + 6X8 -> 1 + 1 int HandlerRequest(Connection *conn, std::string &message) { return BeginHandler(conn, message, calculator); } int main(int argc, char *argv[]) { if (argc != 2) { usage(argv[0]); exit(0); } // http.XXX("GET", "/aaa"); unique_ptr<TcpServer> svr(new TcpServer(HandlerRequest, atoi(argv[1]))); svr->Run(); return 0; }
Makefile文件:
server:main.cc g++ -o $@ $^ -std=c++11 .PHONY:clean clean: rm -f server
2.2.epoll?ET服務(wù)器源代碼講解
2.2.1.設(shè)計(jì)思路
epoll ET服務(wù)器:
在epoll ET服務(wù)器中,我們需要處理如下幾種事件:
??讀事件:如果是監(jiān)聽套接字的讀事件就緒則調(diào)用accept函數(shù)獲取底層的連接,如果是其他套接字的讀事件就緒則調(diào)用recv函數(shù)讀取客戶端發(fā)來(lái)的數(shù)據(jù)。
??寫事件:寫事件就緒則將待發(fā)送的數(shù)據(jù)寫入到發(fā)送緩沖區(qū)當(dāng)中。
??異常事件:當(dāng)某個(gè)套接字的異常事件就緒時(shí)我們不做過(guò)多處理,直接關(guān)閉該套接字。
當(dāng)epoll ET服務(wù)器監(jiān)測(cè)到某一事件就緒后,就會(huì)將該事件交給對(duì)應(yīng)的服務(wù)處理程序進(jìn)行處理。
Reactor模式的五個(gè)角色:
在這個(gè)epoll ET服務(wù)器中,Reactor模式中的五個(gè)角色對(duì)應(yīng)如下:
??句柄:文件描述符。
??同步事件分離器:I/O多路復(fù)用epoll。
??事件處理器:包括讀回調(diào)、寫回調(diào)和異?;卣{(diào)。
??具體事件處理器:讀回調(diào)、寫回調(diào)和異常回調(diào)的具體實(shí)現(xiàn)。
??初始分發(fā)器:TcpServer類當(dāng)中的Dispatcher函數(shù)。
Dispatcher函數(shù)要做的就是調(diào)用epoll_wait函數(shù)等待事件發(fā)生,當(dāng)有事件發(fā)生后就將就緒的事件派發(fā)給對(duì)應(yīng)的服務(wù)處理程序即可。
Connection類:
??在Reactor的工作流程中說(shuō)到,在注冊(cè)事件處理器時(shí)需要將其與Handle關(guān)聯(lián),本質(zhì)上就是需要將讀回調(diào)、寫回調(diào)和異?;卣{(diào)與某個(gè)文件描述符關(guān)聯(lián)起來(lái)。
??這樣做的目的就是為了當(dāng)某個(gè)文件描述符上的事件就緒時(shí)可以找到其對(duì)應(yīng)的各種回調(diào)函數(shù),進(jìn)而執(zhí)行對(duì)應(yīng)的回調(diào)方法來(lái)處理該事件。
所以我們可以設(shè)計(jì)一個(gè)Connection類,該類當(dāng)中的成員就包括一個(gè)文件描述符,以及該文件描述符對(duì)應(yīng)的各種回調(diào)函數(shù),此外還有一些其他成員,后面實(shí)現(xiàn)的時(shí)候再做詳細(xì)論述。
TcpServer類:
??在Reactor的工作流程中說(shuō)到,當(dāng)所有事件處理器注冊(cè)完畢后,會(huì)使用同步事件分離器等待這些事件發(fā)生,當(dāng)某個(gè)事件處理器的Handle變?yōu)镽eady狀態(tài)時(shí),同步事件分離器會(huì)通知初始分發(fā)器,然后初始分發(fā)器會(huì)將Ready狀態(tài)的Handle作為key來(lái)尋找其對(duì)應(yīng)的事件處理器,并調(diào)用該事件處理器中對(duì)應(yīng)的回調(diào)方法來(lái)響應(yīng)該事件。
??本質(zhì)就是當(dāng)事件注冊(cè)完畢后,會(huì)調(diào)用epoll_wait函數(shù)來(lái)等待這些事件發(fā)生,當(dāng)某個(gè)事件就緒時(shí)epoll_wait函數(shù)會(huì)告知調(diào)用方,然后調(diào)用方就根據(jù)就緒的文件描述符來(lái)找到其對(duì)應(yīng)的各種回調(diào)函數(shù),并調(diào)用對(duì)應(yīng)的回調(diào)函數(shù)進(jìn)行事件處理。
對(duì)此我們可以設(shè)計(jì)一個(gè)TcpServer類。??該類當(dāng)中有一個(gè)成員函數(shù)叫做Dispatcher,這個(gè)函數(shù)就是所謂的初始分發(fā)器,在該函數(shù)內(nèi)部會(huì)調(diào)用epoll_wait函數(shù)等待事件的發(fā)生,當(dāng)事件發(fā)生后會(huì)告知Dispatcher已經(jīng)就緒的事件。
??當(dāng)事件就緒后需要根據(jù)就緒的文件描述符來(lái)找到其對(duì)應(yīng)的各種回調(diào)函數(shù),由于我們會(huì)將每個(gè)文件描述符及其對(duì)應(yīng)的各種回調(diào)都封裝到一個(gè)Connection結(jié)構(gòu)當(dāng)中,所以實(shí)際我們就是需要根據(jù)文件描述符找到其對(duì)應(yīng)的Connection結(jié)構(gòu)。
??我們可以使用C++ STL當(dāng)中的unordered_map,來(lái)建立各個(gè)文件描述符與其對(duì)應(yīng)的Connection結(jié)構(gòu)之間的映射,這個(gè)unordered_map可以作為TcpServer類的一個(gè)成員變量,當(dāng)需要找某個(gè)文件描述符的Connection結(jié)構(gòu)時(shí)就可以通過(guò)該成員變量找到。
此外,在TcpServer類當(dāng)中還有一些其他成員,后面實(shí)現(xiàn)的時(shí)候再做詳細(xì)論述。
epoll ET服務(wù)器的工作流程:
??這個(gè)epoll ET服務(wù)器在Reactor模式下的工作流程如下:
首先epoll ET服務(wù)器需要進(jìn)行套接字的創(chuàng)建、綁定和監(jiān)聽。
??然后定義一個(gè)TcpServer對(duì)象并初始化,初始化時(shí)要做的就是創(chuàng)建epoll模型。
??緊接著需要為監(jiān)聽套接字創(chuàng)建對(duì)應(yīng)的Connection結(jié)構(gòu),并調(diào)用TcpServer類中提供的AddConnection函數(shù)將監(jiān)聽套接字添加到epoll模型中,并建立監(jiān)聽套接字與其對(duì)應(yīng)的Connection結(jié)構(gòu)之間的映射關(guān)系。
??之后就可以不斷調(diào)用TcpServer類中的Dispatcher函數(shù)進(jìn)行事件派發(fā)。
在事件處理過(guò)程中,會(huì)不斷向Dispatcher當(dāng)中新增或刪除事件,而每個(gè)事件就緒時(shí)都會(huì)自動(dòng)調(diào)用其對(duì)應(yīng)的回調(diào)函數(shù)進(jìn)行處理,所以我們要做的就是不斷調(diào)用Dispatcher函數(shù)進(jìn)行事件派發(fā)即可。
2.2.2.Connection結(jié)構(gòu)
Connection結(jié)構(gòu)中除了包含文件描述符和其對(duì)應(yīng)的讀回調(diào)、寫回調(diào)和異?;卣{(diào)之外,還包含一個(gè)輸入緩沖區(qū)inbuffer、一個(gè)輸出緩沖區(qū)outbuffer以及一個(gè)回指指針R。
??當(dāng)某個(gè)文件描述符的讀事件就緒時(shí),我們會(huì)調(diào)用recv函數(shù)讀取客戶端發(fā)來(lái)的數(shù)據(jù),但我們并不能保證我們讀取到了一個(gè)完整的報(bào)文,因此需要將讀取到的數(shù)據(jù)暫時(shí)存放到該文件描述符對(duì)應(yīng)的inbuffer當(dāng)中,當(dāng)inbuffer當(dāng)中可以分離出一個(gè)完整的報(bào)文后再將其分離出來(lái)進(jìn)行數(shù)據(jù)處理,這里的inbuffer本質(zhì)就是用來(lái)解決粘包問(wèn)題的。
??當(dāng)處理完一個(gè)報(bào)文請(qǐng)求后,需要將響應(yīng)數(shù)據(jù)發(fā)送給客戶端,但我們并不能保證底層TCP的發(fā)送緩沖區(qū)中有足夠的空間供我們寫入,因此需要將要發(fā)送的數(shù)據(jù)暫時(shí)存放到該文件描述符對(duì)應(yīng)的outbuffer當(dāng)中,當(dāng)?shù)讓覶CP的發(fā)送緩沖區(qū)中有空間,即寫事件就緒時(shí),再依次發(fā)送outbuffer當(dāng)中的數(shù)據(jù)。
??Connection結(jié)構(gòu)當(dāng)中設(shè)置回指指針R,便于快速找到我們定義的TcpServer對(duì)象,因?yàn)楹罄m(xù)我們需要根據(jù)Connection結(jié)構(gòu)找到這個(gè)TcpServer對(duì)象。比如當(dāng)連接事件就緒時(shí),需要調(diào)用TcpServer類當(dāng)中的AddEvent函數(shù)將其添加到Dispatcher當(dāng)中。
此外,Connection結(jié)構(gòu)當(dāng)中需要提供管理回調(diào)的成員函數(shù),便于外部對(duì)Connection結(jié)構(gòu)當(dāng)中的各種回調(diào)進(jìn)行設(shè)置。
2.2.3.TcpServer類
在TcpServer類當(dāng)中有一個(gè)unordered_map成員,用于建立文件描述符和與其對(duì)應(yīng)的Connection結(jié)構(gòu)之間的映射,還有一個(gè)epfd成員,該成員是epoll模型對(duì)應(yīng)的文件描述符。
在初始化TcpServer對(duì)象的時(shí)候就可以調(diào)用epoll_create函數(shù)創(chuàng)建epoll模型,并將該epoll模型對(duì)應(yīng)的文件描述符用epfd成員記錄下來(lái),便于后續(xù)使用。
Reactor對(duì)象在析構(gòu)的時(shí)候,需要調(diào)用close函數(shù)將該epoll模型進(jìn)行關(guān)閉。
Dispatcher函數(shù)(事件分派器):
TcpServer類當(dāng)中的Dispatcher函數(shù)就是之前所說(shuō)的初始分發(fā)器,這里我們更形象的將其稱之為事件分派器。??事件分派器要做的就是調(diào)用epoll_wait函數(shù)等待事件發(fā)生。
??當(dāng)某個(gè)文件描述符上的事件發(fā)生后,先通過(guò)unordered_map找到該文件描述符對(duì)應(yīng)的Connection結(jié)構(gòu),然后調(diào)用Connection結(jié)構(gòu)當(dāng)中對(duì)應(yīng)的回調(diào)函數(shù)對(duì)該事件進(jìn)行處理即可。說(shuō)明一下:
??這里沒(méi)有用switch或if語(yǔ)句對(duì)epoll_wait函數(shù)的返回值進(jìn)行判斷,而是借用for循環(huán)對(duì)其返回值進(jìn)行了判斷。
??如果epoll_wait的返回值為-1則說(shuō)明epoll_wait函數(shù)調(diào)用失敗,此時(shí)不會(huì)進(jìn)入到for循環(huán)內(nèi)部進(jìn)行事件處理。
??如果epoll_wait的返回值為0則說(shuō)明epoll_wait函數(shù)超時(shí)返回,此時(shí)也不會(huì)進(jìn)入到for循環(huán)內(nèi)部進(jìn)行事件處理。
??如果epoll_wait的返回值大于0則說(shuō)明epoll_wait函數(shù)調(diào)用成功,此時(shí)才會(huì)進(jìn)入到for循環(huán)內(nèi)部調(diào)用對(duì)應(yīng)的回調(diào)函數(shù)對(duì)事件進(jìn)行處理。
??事件處理時(shí)最好先對(duì)異常事件進(jìn)行處理,因此代碼中將異常事件的判斷放在了最前面。
AddConnection函數(shù):
TcpServer類當(dāng)中的AddConnection函數(shù)是用于進(jìn)行事件注冊(cè)的。
??在注冊(cè)事件時(shí)需要傳入一個(gè)文件描述符和一個(gè)事件集合,表示需要監(jiān)視哪個(gè)文件描述符上的哪些事件。
??還需要傳入該文件描述符對(duì)應(yīng)的Connection結(jié)構(gòu),表示當(dāng)該文件描述符上的事件就緒后應(yīng)該執(zhí)行的回調(diào)方法。
??在AddEvent函數(shù)內(nèi)部要做的就是,調(diào)用epoll_ctl函數(shù)將該文件描述符及其對(duì)應(yīng)的事件集合注冊(cè)到epoll模型當(dāng)中,然后建立該文件描述符與其對(duì)應(yīng)的EventItem結(jié)構(gòu)的映射關(guān)系。
EnableReadWrite函數(shù):
Reactor類當(dāng)中的EnableReadWrite函數(shù),用于使能或使能某個(gè)文件描述符的讀寫事件。
??調(diào)用EnableReadWrite函數(shù)時(shí)需要傳入一個(gè)文件描述符,表示需要設(shè)置的是哪個(gè)文件描述符對(duì)應(yīng)的事件。
??還需要傳入兩個(gè)bool值,分別表示需要使能還是使能讀寫事件。
??EnableReadWrite函數(shù)內(nèi)部會(huì)調(diào)用epoll_ctl函數(shù)修改將該文件描述符的監(jiān)聽事件。
2.2.4.回調(diào)函數(shù)
下面我們就可以實(shí)現(xiàn)一些回調(diào)函數(shù),這里主要實(shí)現(xiàn)四個(gè)回調(diào)函數(shù)。
??Accepter:當(dāng)連接事件到來(lái)時(shí)可以調(diào)用該回調(diào)函數(shù)獲取底層建立好的連接。
??TcpRecver:當(dāng)讀事件就緒時(shí)可以調(diào)用該回調(diào)函數(shù)讀取客戶端發(fā)來(lái)的數(shù)據(jù)并進(jìn)行處理。
??TcpSender:當(dāng)寫事件就緒時(shí)可以調(diào)用該回調(diào)函數(shù)向客戶端發(fā)送響應(yīng)數(shù)據(jù)。
??TcpExcepter:當(dāng)異常事件就緒時(shí)可以調(diào)用該函數(shù)將對(duì)應(yīng)的文件描述符進(jìn)行關(guān)閉。
當(dāng)我們?yōu)槟硞€(gè)文件描述符創(chuàng)建Connection結(jié)構(gòu)時(shí),就可以調(diào)用Connection類提供的回調(diào)設(shè)置函數(shù)(SetRecver、SetSender、SetExcepter),將這些回調(diào)函數(shù)到Connection結(jié)構(gòu)當(dāng)中。??我們會(huì)將監(jiān)聽套接字對(duì)應(yīng)的Connection結(jié)構(gòu)當(dāng)中的recver_設(shè)置為Accepter,因?yàn)楸O(jiān)聽套接字的讀事件就緒就意味著連接事件就緒了,而監(jiān)聽套接字一般只關(guān)心讀事件,因此監(jiān)聽套接字對(duì)應(yīng)的sender_和excepter_可以設(shè)置為nullptr。
??當(dāng)Dispatcher監(jiān)測(cè)到監(jiān)聽套接字的讀事件就緒時(shí),會(huì)調(diào)用監(jiān)聽套接字對(duì)應(yīng)的Connection結(jié)構(gòu)當(dāng)中的recver_回調(diào),此時(shí)就會(huì)調(diào)用Accepter回調(diào)獲取底層建立好的連接。
??而對(duì)于與客戶端建立連接的套接字,我們會(huì)將其對(duì)應(yīng)的Connection結(jié)構(gòu)當(dāng)中的recver_、sender_和excepter_分別設(shè)置為這里的TcpRecver、TcpSender和TcpExcepter。
??當(dāng)Dispatcher監(jiān)測(cè)到這些套接字的事件就緒時(shí),就會(huì)調(diào)用其對(duì)應(yīng)的Connection結(jié)構(gòu)當(dāng)中對(duì)應(yīng)的回調(diào)函數(shù),也就是這里的TcpRecver、TcpSender和TcpExcepter。
Accepter回調(diào):
Accepter回調(diào)用于處理連接事件,其工作流程如下:
1.調(diào)用accept函數(shù)獲取底層建立好的連接。
2.將獲取到的套接字設(shè)置為非阻塞,并為其創(chuàng)建Connection結(jié)構(gòu),填充Connection結(jié)構(gòu)當(dāng)中的各個(gè)字段,并注冊(cè)該套接字相關(guān)的回調(diào)方法。
3.將該套接字及其對(duì)應(yīng)需要關(guān)心的事件注冊(cè)到Dispatcher當(dāng)中。
下一次Dispatcher在進(jìn)行事件派發(fā)時(shí)就會(huì)幫我們關(guān)注該套接字對(duì)應(yīng)的事件,當(dāng)事件就緒時(shí)就會(huì)執(zhí)行該套接字對(duì)應(yīng)的Connection結(jié)構(gòu)中對(duì)應(yīng)的回調(diào)方法。需要注意的是,因?yàn)檫@里實(shí)現(xiàn)的ET模式下的epoll服務(wù)器,因此在獲取底層連接時(shí)需要循環(huán)調(diào)用accept函數(shù)進(jìn)行讀取,并且監(jiān)聽套接字必須設(shè)置為非阻塞。
??因?yàn)镋T模式下只有當(dāng)?shù)讓咏⒌倪B接從無(wú)到有或是從有到多時(shí)才會(huì)通知上層,如果沒(méi)有一次性將底層建立好的連接全部獲取,并且此后再也沒(méi)有建立好的連接,那么底層沒(méi)有讀取完的連接就相當(dāng)于丟失了,所以需要循環(huán)多次調(diào)用accept函數(shù)獲取底層建立好的連接。
??循環(huán)調(diào)用accept函數(shù)也就意味著,當(dāng)?shù)讓舆B接全部被獲取后再調(diào)用accept函數(shù),此時(shí)就會(huì)因?yàn)榈讓右呀?jīng)沒(méi)有連接了而被阻塞住,因此需要將監(jiān)聽套接字設(shè)置為非阻塞,這樣當(dāng)?shù)讓記](méi)有連接時(shí)accept就會(huì)返回,而不會(huì)被阻塞住。
accept獲取到的新的套接字也需要設(shè)置為非阻塞,就是為了避免將來(lái)循環(huán)調(diào)用recv、send等函數(shù)時(shí)被阻塞。設(shè)置文件描述符為非阻塞:
設(shè)置文件描述符為非阻塞時(shí),需要先調(diào)用fcntl函數(shù)獲取該文件描述符對(duì)應(yīng)的文件狀態(tài)標(biāo)記,然后在該文件狀態(tài)標(biāo)記的基礎(chǔ)上添加非阻塞標(biāo)記O_NONBLOCK,最后調(diào)用fcntl函數(shù)對(duì)該文件描述符的狀態(tài)標(biāo)記進(jìn)行設(shè)置即可。
監(jiān)聽套接字設(shè)置為非阻塞后,當(dāng)?shù)讓舆B接不就緒時(shí),accept函數(shù)會(huì)以出錯(cuò)的形式返回,因此當(dāng)調(diào)用accept函數(shù)的返回值小于0時(shí),需要繼續(xù)判斷錯(cuò)誤碼。
??如果錯(cuò)誤碼為EAGAIN或EWOULDBLOCK,說(shuō)明本次出錯(cuò)返回是因?yàn)榈讓右呀?jīng)沒(méi)有可獲取的連接了,此時(shí)底層連接全部獲取完畢,這時(shí)我們可以返回0,表示本次Accepter調(diào)用成功。
??如果錯(cuò)誤碼為EINTR,說(shuō)明本次調(diào)用accept函數(shù)獲取底層連接時(shí)被信號(hào)中斷了,這時(shí)還應(yīng)該繼續(xù)調(diào)用accept函數(shù)進(jìn)行獲取。
??除此之外,才說(shuō)明accept函數(shù)是真正調(diào)用失敗了,這時(shí)我們可以返回-1,表示本次Accepter調(diào)用失敗。問(wèn)題:accept、recv和send等IO系統(tǒng)調(diào)用為什么會(huì)被信號(hào)中斷?
答:IO系統(tǒng)調(diào)用函數(shù)出錯(cuò)返回并且將錯(cuò)誤碼設(shè)置為EINTR,表明本次在進(jìn)行數(shù)據(jù)讀取或數(shù)據(jù)寫入之前被信號(hào)中斷了,也就是說(shuō)IO系統(tǒng)調(diào)用在陷入內(nèi)核,但并沒(méi)有返回用戶態(tài)的時(shí)候內(nèi)核跑去處理其他信號(hào)了。
??在內(nèi)核態(tài)返回用戶態(tài)之前會(huì)檢查信號(hào)的pending位圖,也就是未決信號(hào)集,如果pending位圖中有未處理的信號(hào),那么內(nèi)核就會(huì)對(duì)該信號(hào)進(jìn)行處理。
??但I(xiàn)O系統(tǒng)調(diào)用函數(shù)在進(jìn)行IO操作之前就被信號(hào)中斷了,這實(shí)際上是一個(gè)特例,因?yàn)镮O過(guò)程分為“等”和“拷貝”兩個(gè)步驟,而一般“等”的過(guò)程比較漫長(zhǎng),而在這個(gè)過(guò)程中我們的執(zhí)行流其實(shí)是處于閑置的狀態(tài)的,因此在“等”的過(guò)程中如果有信號(hào)產(chǎn)生,內(nèi)核就會(huì)立即進(jìn)行信號(hào)的處理。寫事件是按需打開的:
這里調(diào)用accept獲取上來(lái)的套接字在添加到Dispatcher中時(shí),只添加了EOPLLIN和EPOLLET事件,也就是說(shuō)只讓epoll幫我們關(guān)心該套接字的讀事件。
??這里之所以沒(méi)有添加寫事件,是因?yàn)楫?dāng)前我們并沒(méi)有要發(fā)送的數(shù)據(jù),因此沒(méi)有必要讓epoll幫我們關(guān)心寫事件。
??一般讀事件是經(jīng)常會(huì)被設(shè)置的,而寫事件則是按序打開的,只有當(dāng)我們有數(shù)據(jù)要發(fā)送時(shí)才會(huì)將寫事件打開,并且在數(shù)據(jù)全部寫入完畢后又會(huì)立即將寫事件關(guān)閉。注:
1.如果LT模式,一定是要先檢測(cè)有沒(méi)有對(duì)應(yīng)的空間(先打開對(duì)寫事件的關(guān)心,epoll會(huì)自動(dòng)進(jìn)行事件派發(fā)),然后才寫入。LT模式下,只要你打開了寫入,我們的代碼會(huì)自動(dòng)進(jìn)行調(diào)用TcpSender方法,進(jìn)行發(fā)送。
2.如果是ET模式,也可以采用上面的方法。不過(guò),一般ET我們追求高效,直接發(fā)送。通過(guò)發(fā)送是否全部發(fā)送完成,來(lái)決定是否要進(jìn)行打開對(duì)寫事件進(jìn)行關(guān)心:a.先調(diào)用send函數(shù)發(fā)送,發(fā)完就完了。
b.如果沒(méi)有send發(fā)完,打開寫事件關(guān)心,讓epoll自動(dòng)幫我們進(jìn)行發(fā)送。
一般寫事件關(guān)心,不能常打開,一定是需要的時(shí)候,在進(jìn)行打開)不需要就要關(guān)閉對(duì)寫事件的關(guān)心。
TcpRecver回調(diào):
TcpRecver回調(diào)用于處理讀事件,其工作流程如下:
1.循環(huán)調(diào)用recv函數(shù)讀取數(shù)據(jù),并將讀取到的數(shù)據(jù)添加到該套接字對(duì)應(yīng)EventItem結(jié)構(gòu)的inbuffer當(dāng)中。
2.對(duì)inbuffer當(dāng)中的數(shù)據(jù)進(jìn)行切割,將完整的報(bào)文切割出來(lái),剩余的留在inbuffer當(dāng)中。
3.對(duì)切割出來(lái)的完整報(bào)文進(jìn)行反序列化。
4.業(yè)務(wù)處理。
5.業(yè)務(wù)處理后形成響應(yīng)報(bào)文。
6.將響應(yīng)報(bào)頭添加到對(duì)應(yīng)EventItem結(jié)構(gòu)的outbuffer當(dāng)中,并打開寫事件。
下一次Dispatcher在進(jìn)行事件派發(fā)時(shí)就會(huì)幫我們關(guān)注該套接字的寫事件,當(dāng)寫事件就緒時(shí)就會(huì)執(zhí)行該套接字對(duì)應(yīng)的EventItem結(jié)構(gòu)中寫回調(diào)方法,進(jìn)而將outbuffer中的響應(yīng)數(shù)據(jù)發(fā)送給客戶端。數(shù)據(jù)讀?。?/strong>
??循環(huán)調(diào)用recv函數(shù)將讀取到的數(shù)據(jù)添加到inbuffer當(dāng)中。
??當(dāng)recv函數(shù)的返回值小于0時(shí)同樣需要進(jìn)一步判斷錯(cuò)誤碼,如果錯(cuò)誤碼為EAGAIN或EWOULDBLOCK則說(shuō)明底層數(shù)據(jù)讀取完畢了,如果錯(cuò)誤碼為EINTR則說(shuō)明讀取過(guò)程被信號(hào)中斷了,此時(shí)還需要繼續(xù)調(diào)用recv函數(shù)進(jìn)行讀取,否則就是讀取出錯(cuò)了。
??當(dāng)讀取出錯(cuò)時(shí)直接調(diào)用該套接字對(duì)應(yīng)的excepter_回調(diào),最終就會(huì)調(diào)用到下面將要實(shí)現(xiàn)的TcpExcepter回調(diào),在我們會(huì)在TcpExcepter回調(diào)當(dāng)中將該套接字進(jìn)行關(guān)閉。報(bào)文切割:
報(bào)文切割本質(zhì)就是為了防止粘包問(wèn)題,而粘包問(wèn)題實(shí)際是涉及到協(xié)議定制的。
??因?yàn)槲覀冃枰鶕?jù)協(xié)議知道如何將各個(gè)報(bào)文進(jìn)行分離,比如UDP分離報(bào)文采用的就是定長(zhǎng)報(bào)頭+自描述字段。
??我們的目的是演示整個(gè)數(shù)據(jù)處理的過(guò)程,為了簡(jiǎn)單起見(jiàn)就不進(jìn)行過(guò)于復(fù)雜的協(xié)議定制了,這里我們就以“X”作為各個(gè)報(bào)文之間的分隔符,每個(gè)報(bào)文的最后都會(huì)以一個(gè)“X”作為報(bào)文結(jié)束的標(biāo)志。
??因此現(xiàn)在要做的就是以“X”作為分隔符對(duì)inbuffer當(dāng)中的字符串進(jìn)行切割,這里將這個(gè)過(guò)程封裝成一個(gè)PackageSplit函數(shù)。
??PackageSplit函數(shù)要做的就是對(duì)inbuffer當(dāng)中的字符串進(jìn)行切割,將切割出來(lái)的一個(gè)個(gè)報(bào)文放到vector當(dāng)中,對(duì)于最后無(wú)法切出完整報(bào)文的數(shù)據(jù)就留在inbuffer當(dāng)中即可。反序列化:
在數(shù)據(jù)發(fā)送之前需要進(jìn)行序列化Serialize,接收到數(shù)據(jù)之后需要對(duì)數(shù)據(jù)進(jìn)行反序列化Parser。
??序列化就是將對(duì)象的狀態(tài)信息轉(zhuǎn)換為可以存儲(chǔ)或傳輸?shù)男问剑ㄗ止?jié)序列)的過(guò)程。
??反序列化就是把字節(jié)序列恢復(fù)為原對(duì)象的過(guò)程。
實(shí)際反序列化也是與協(xié)議定制相關(guān)的,假設(shè)這里的epoll服務(wù)器向客戶端提供的就是計(jì)算服務(wù),客戶端向服務(wù)器發(fā)來(lái)的都是需要服務(wù)器計(jì)算的計(jì)算表達(dá)式,因此可以用一個(gè)結(jié)構(gòu)體來(lái)描述這樣一個(gè)計(jì)算表達(dá)式,結(jié)構(gòu)體當(dāng)中包含兩個(gè)操作數(shù)x和y,以及一個(gè)操作符op。此時(shí)這里所謂的反序列化就是將一個(gè)計(jì)算表達(dá)式轉(zhuǎn)換成這樣一個(gè)結(jié)構(gòu)體,
??因此現(xiàn)在要做的就是將形如“1 + 2”這樣的計(jì)算表達(dá)式轉(zhuǎn)換成一個(gè)結(jié)構(gòu)體,該結(jié)構(gòu)體當(dāng)中的x成員的值就是1,y的值就是2,op的值就是‘+’,這里將這個(gè)過(guò)程封裝成一個(gè)Parser函數(shù)。
說(shuō)明一下:?實(shí)際在做項(xiàng)目時(shí)不需要我們自己進(jìn)行序列化和反序列化,我們一般會(huì)直接用JSON或XML這樣的序列化反序列化工具。
業(yè)務(wù)處理:
業(yè)務(wù)處理就是服務(wù)器拿到客戶端發(fā)來(lái)的數(shù)據(jù)后,對(duì)數(shù)據(jù)進(jìn)行數(shù)據(jù)分析,最終拿到客戶端想要的資源。
??我們這里要做的業(yè)務(wù)處理非常簡(jiǎn)單,就是用反序列化后的數(shù)據(jù)進(jìn)行數(shù)據(jù)計(jì)算,此時(shí)得到的計(jì)算結(jié)果就是客戶端想要的。
形成響應(yīng)報(bào)文:
在業(yè)務(wù)處理后我們已經(jīng)拿到了客戶端想要的數(shù)據(jù),現(xiàn)在我們要做的就是形成響應(yīng)報(bào)文,由于我們這里規(guī)定每個(gè)報(bào)文都以“X”作為報(bào)文結(jié)束的標(biāo)志,因此在形成響應(yīng)報(bào)文的時(shí)候,就需要在每一個(gè)計(jì)算結(jié)果后面都添加上一個(gè)“X”,表示這是之前某一個(gè)請(qǐng)求報(bào)文的響應(yīng)報(bào)文,因?yàn)閰f(xié)議制定后就需要雙方遵守。
將響應(yīng)報(bào)文添加到outbuffer中:
響應(yīng)報(bào)文構(gòu)建完后需要將其添加到該套接字對(duì)應(yīng)的outbuffer中,并打開該套接字的寫事件,此后當(dāng)寫事件就緒時(shí)就會(huì)將outbuffer當(dāng)中的數(shù)據(jù)發(fā)送出去。
TcpSender回調(diào):
TcpSender回調(diào)用于處理寫事件,其工作流程如下:
1.循環(huán)調(diào)用send函數(shù)發(fā)送數(shù)據(jù),并將發(fā)送出去的數(shù)據(jù)從該套接字對(duì)應(yīng)Connection結(jié)構(gòu)的outbuffer中刪除。
2.如果循環(huán)調(diào)用send函數(shù)后該套接字對(duì)應(yīng)的outbuffer當(dāng)中的數(shù)據(jù)被全部發(fā)送,此時(shí)就需要將該套接字對(duì)應(yīng)的寫事件關(guān)閉,因?yàn)橐呀?jīng)沒(méi)有要發(fā)送的數(shù)據(jù)了,如果outbuffer當(dāng)中的數(shù)據(jù)還有剩余,那么該套接字對(duì)應(yīng)的寫事件就應(yīng)該繼續(xù)打開。循環(huán)調(diào)用send函數(shù)發(fā)送數(shù)據(jù)的過(guò)程:
??循環(huán)調(diào)用send函數(shù)將outbuffer中的數(shù)據(jù)發(fā)送出去。
??當(dāng)send函數(shù)的返回值小于0時(shí)也需要進(jìn)一步判斷錯(cuò)誤碼,如果錯(cuò)誤碼為EAGAIN或EWOULDBLOCK則說(shuō)明底層TCP發(fā)送緩沖區(qū)已經(jīng)被寫滿了,這時(shí)需要將已經(jīng)發(fā)送的數(shù)據(jù)從outbuffer中移除。
??如果錯(cuò)誤碼為EINTR則說(shuō)明發(fā)送過(guò)程被信號(hào)中斷了,此時(shí)還需要繼續(xù)調(diào)用send函數(shù)進(jìn)行發(fā)送,否則就是發(fā)送出錯(cuò)了。
??當(dāng)發(fā)送出錯(cuò)時(shí)也直接調(diào)用該套接字對(duì)應(yīng)的excepter_回調(diào),最終就會(huì)調(diào)用到下面將要實(shí)現(xiàn)的TcpExcepter回調(diào),在我們會(huì)在TcpExcepter回調(diào)當(dāng)中將該套接字進(jìn)行關(guān)閉。
??如果最終outbuffer當(dāng)中的數(shù)據(jù)全部發(fā)送成功,則將outbuffer清空即可。
TcpExcepter回調(diào):
errorer回調(diào)用于處理異常事件。
??對(duì)于異常事件就緒的套接字我們這里不做其他過(guò)多的處理,簡(jiǎn)單的調(diào)用close函數(shù)將該套接字關(guān)閉即可。
??但是在關(guān)閉該套接字之前,需要先調(diào)用DelEvent函數(shù)將該套接字從epoll模型中刪除,并取消該套接字與其對(duì)應(yīng)的Connection結(jié)構(gòu)的映射關(guān)系。
??由于在Dispatcher當(dāng)中是先處理的異常事件,為了避免該套接字被關(guān)閉后繼續(xù)進(jìn)行讀寫操作,然后因?yàn)樽x寫操作失敗再次調(diào)用errorer回調(diào)重復(fù)關(guān)閉該文件描述符,因此在關(guān)閉該套接字后將其Connection當(dāng)中的文件描述符值設(shè)置為-1。
??在調(diào)用TcpRecver和TcpSender回調(diào)執(zhí)行讀寫操作之前,都會(huì)判斷該Connection結(jié)構(gòu)當(dāng)中的文件描述符值是否有效,如果無(wú)效則不會(huì)進(jìn)行后續(xù)操作。
2.2.5.套接字相關(guān)
這里可以編寫一個(gè)Socket類,對(duì)套接字相關(guān)的接口進(jìn)行一定程度的封裝,為了讓外部能夠直接調(diào)用Socket類當(dāng)中封裝的函數(shù),于是將這些函數(shù)定義成了靜態(tài)成員函數(shù)。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-481182.html
2.2.6.Reactor模式和Proactor模式
??Reactor:半異步半同步,Reactor (tcp)服務(wù)器是Linux服務(wù)最常用的(幾乎沒(méi)有之一),既負(fù)責(zé)事件派發(fā),又負(fù)責(zé)IO(負(fù)責(zé)業(yè)務(wù)邏輯的處理)
??Proactor:前攝模式,其他平臺(tái)可能出現(xiàn)的模式。只負(fù)責(zé)負(fù)責(zé)事件派發(fā),就緒的事件推送給后端的進(jìn)程、線程池,不關(guān)心負(fù)責(zé)IO(不負(fù)責(zé)業(yè)務(wù)邏輯的處理)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-481182.html
到了這里,關(guān)于Linux - 第25節(jié) - Linux高級(jí)IO(三)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!