国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Linux - 第25節(jié) - Linux高級(jí)IO(三)

這篇具有很好參考價(jià)值的文章主要介紹了Linux - 第25節(jié) - Linux高級(jí)IO(三)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

目錄

1.Reactor模式

1.1.Reactor模式的定義

1.2.Reactor模式的角色構(gòu)成

1.3.Reactor模式的工作流程

2.epoll?ET服務(wù)器(Reactor模式)

2.1.epoll?ET服務(wù)器源代碼

2.2.epoll?ET服務(wù)器源代碼講解

2.2.1.設(shè)計(jì)思路

2.2.2.Connection結(jié)構(gòu)

2.2.3.TcpServer類

2.2.4.回調(diào)函數(shù)

2.2.5.套接字相關(guān)

2.2.6.Reactor模式和Proactor模式


1.Reactor模式

1.1.Reactor模式的定義

Reactor反應(yīng)器模式,也叫做分發(fā)者模式或通知者模式,是一種將就緒事件派發(fā)給對(duì)應(yīng)服務(wù)處理程序的事件設(shè)計(jì)模式。

Linux - 第25節(jié) - Linux高級(jí)IO(三)

1.2.Reactor模式的角色構(gòu)成

Reactor主要由以下五個(gè)角色構(gòu)成:

角色 解釋
Handle(句柄) 用于標(biāo)識(shí)不同的事件,本質(zhì)就是一個(gè)文件描述符。
Sychronous Event Demultiplexer(同步事件分離器) 本質(zhì)就是一個(gè)系統(tǒng)調(diào)用,用于等待事件的發(fā)生。對(duì)于Linux來(lái)說(shuō),同步事件分離器指的就是I/O多路復(fù)用,比如select、poll、epoll等。
Event Handler(事件處理器) 由多個(gè)回調(diào)方法構(gòu)成,這些回調(diào)方法構(gòu)成了與應(yīng)用相關(guān)的對(duì)于某個(gè)事件的處理反饋。
Concrete Event Handler(具體事件處理器) 事件處理器中各個(gè)回調(diào)方法的具體實(shí)現(xiàn)。
Initiation Dispatcher(初始分發(fā)器) 初始分發(fā)器實(shí)際上就是Reactor角色,初始分發(fā)器會(huì)通過(guò)同步事件分離器來(lái)等待事件的發(fā)生,當(dāng)對(duì)應(yīng)事件就緒時(shí)就調(diào)用事件處理器,最后調(diào)用對(duì)應(yīng)的回調(diào)方法來(lái)處理這個(gè)事件。

1.3.Reactor模式的工作流程

Reactor模式的工作流程如下:

1.當(dāng)應(yīng)用向初始分發(fā)器注冊(cè)具體事件處理器時(shí),應(yīng)用會(huì)標(biāo)識(shí)出該事件處理器希望初始分發(fā)器在某個(gè)事件發(fā)生時(shí)向其通知,該事件與Handle關(guān)聯(lián)。
2.初始分發(fā)器會(huì)要求每個(gè)事件處理器向其傳遞內(nèi)部的Handle,該Handle向操作系統(tǒng)標(biāo)識(shí)了事件處理器。
3.當(dāng)所有的事件處理器注冊(cè)完畢后,應(yīng)用會(huì)啟動(dòng)初始分發(fā)器的事件循環(huán),這時(shí)初始分發(fā)器會(huì)將每個(gè)事件處理器的Handle合并起來(lái),并使用同步事件分離器等待這些事件的發(fā)生。
4.當(dāng)某個(gè)事件處理器的Handle變?yōu)镽eady狀態(tài)時(shí),同步事件分離器會(huì)通知初始分發(fā)器。
5.初始分發(fā)器會(huì)將Ready狀態(tài)的Handle作為key,來(lái)尋找其對(duì)應(yīng)的事件處理器。
6.初始分發(fā)器會(huì)調(diào)用其對(duì)應(yīng)事件處理器當(dāng)中對(duì)應(yīng)的回調(diào)方法來(lái)響應(yīng)該事件。


2.epoll?ET服務(wù)器(Reactor模式)

2.1.epoll?ET服務(wù)器源代碼

創(chuàng)建TcpServer.hpp文件,寫入下圖一所示的代碼,創(chuàng)建Epoller.hpp文件,寫入下圖二所示的代碼,創(chuàng)建Log.hpp文件,寫入下圖三所示的代碼,創(chuàng)建Protocol.hpp文件,寫入下圖四所示的代碼,創(chuàng)建Service.hpp文件,寫入下圖五所示的代碼,創(chuàng)建Sock.hpp文件,寫入下圖六所示的代碼,創(chuàng)建Util.hpp文件,寫入下圖七所示的代碼,創(chuàng)建main.cc文件,寫入下圖八所示的代碼,創(chuàng)建Makefile文件,寫入下圖九所示的代碼,使用make命令生成server可執(zhí)行程序,使用./server 8080命令運(yùn)行server可執(zhí)行程序,創(chuàng)建兩個(gè)新選項(xiàng)卡作為客戶端,分別使用telnet 127.0.0.1 8080命令連接服務(wù)端,輸入ctrl+]進(jìn)入telnet行,然后回車并輸入消息內(nèi)容發(fā)送給服務(wù)端,服務(wù)端收到消息后進(jìn)行計(jì)算并將計(jì)算結(jié)果返回給客戶端,客戶端發(fā)送完消息后輸入ctrl+]進(jìn)入telnet行,然后輸入quit退出,如下圖十所示。

Linux - 第25節(jié) - Linux高級(jí)IO(三)

Linux - 第25節(jié) - Linux高級(jí)IO(三)

Linux - 第25節(jié) - Linux高級(jí)IO(三)

Linux - 第25節(jié) - Linux高級(jí)IO(三)

Linux - 第25節(jié) - Linux高級(jí)IO(三)

Linux - 第25節(jié) - Linux高級(jí)IO(三)

Linux - 第25節(jié) - Linux高級(jí)IO(三)

Linux - 第25節(jié) - Linux高級(jí)IO(三)

Linux - 第25節(jié) - Linux高級(jí)IO(三)

Linux - 第25節(jié) - Linux高級(jí)IO(三)

TcpServer.hpp文件:

#pragma once

#include <iostream>
#include <string>
#include <vector>
#include <cerrno>
#include <unordered_map>
#include <functional>
#include "Sock.hpp"
#include "Epoller.hpp"
#include "Log.hpp"
#include "Util.hpp"
#include "Protocol.hpp"

// 基于Reactor模式,編寫一個(gè)充分讀取和寫入的,EPOLL(ET)的Server

class Connection;
class TcpServer;

using func_t = std::function<int(Connection *)>;
using callback_t = std::function<int(Connection *, std::string &)>;

// event
class Connection
{
public:
    // 文件描述符
    int sock_;
    TcpServer *R_;
    // 自己的接受和發(fā)送緩沖區(qū)
    std::string inbuffer_;
    std::string outbuffer_;
    // 回調(diào)函數(shù)
    func_t recver_; 
    func_t sender_;
    func_t excepter_;

public:
    Connection(int sock, TcpServer *r) : sock_(sock), R_(r)
    {
    }
    void SetRecver(func_t recver) { recver_ = recver; }
    void SetSender(func_t sender) { sender_ = sender; }
    void SetExcepter(func_t excepter) { excepter_ = excepter; }
    ~Connection() {}
};

//Reactor
//單進(jìn)程:半異步半同步 -- Reactor -- Linux服務(wù)最常用 -- 幾乎沒(méi)有之一
// Reactor(tcp)服務(wù)器, 即負(fù)責(zé)事件派發(fā), 又負(fù)責(zé)IO [又負(fù)責(zé)業(yè)務(wù)邏輯的處理]

// Proactor: 前攝模式 -- 其他平臺(tái)可能出現(xiàn)的模式
// 只負(fù)責(zé)負(fù)責(zé)事件派發(fā),就緒的事件推送給后端的進(jìn)程、線程池, 不關(guān)心 負(fù)責(zé)IO [又負(fù)責(zé)業(yè)務(wù)邏輯的處理]

class TcpServer
{
public:
    TcpServer(callback_t cb, int port = 8080) : cb_(cb)
    {
        revs_ = new struct epoll_event[revs_num];
        // 網(wǎng)絡(luò)功能
        listensock_ = Sock::Socket();
        Util::SetNonBlock(listensock_);
        Sock::Bind(listensock_, port);
        Sock::Listen(listensock_);

        // 多路轉(zhuǎn)接
        epfd_ = Epoller::CreateEpoller();

        // 添加listensock匹配的connection
        AddConnection(listensock_, EPOLLIN | EPOLLET,
                      std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
    }
    void AddConnection(int sockfd, uint32_t event, func_t recver, func_t sender, func_t excepter)
    {
        if (event & EPOLLET)
            Util::SetNonBlock(sockfd);
        // 添加sockfd到epoll
        Epoller::AddEvent(epfd_, sockfd, event);
        // 將sockfd匹配的Connection也添加到當(dāng)前的unordered_map中
        Connection *conn = new Connection(sockfd, this);
        conn->SetRecver(recver);
        conn->SetSender(sender);
        conn->SetExcepter(excepter);

        connections_.insert(std::make_pair(sockfd, conn));
        logMessage(DEBUG, "添加新鏈接到connections成功: %d", sockfd);
    }
    int Accepter(Connection *conn)
    {
        // demo - listensock 也是工作在ET,來(lái)一個(gè)連接,對(duì)應(yīng)就有事件就緒,那么如何來(lái)一批呢?
        while (true)
        {
            std::string clientip;
            uint16_t clientport = 0;
            int sockfd = Sock::Accept(conn->sock_, &clientip, &clientport);
            if (sockfd < 0)
            {
                if (errno == EINTR)
                    continue;
                else if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else
                {
                    logMessage(WARNING, "accept error");
                    return -1;
                }
            }
            logMessage(DEBUG, "get a new link: %d", sockfd);
            // 注意:默認(rèn)我們只設(shè)置了讓epoll幫我們關(guān)心讀事件,沒(méi)有關(guān)心寫事件
            // 為什么沒(méi)有關(guān)注寫事件:因?yàn)樽铋_始的時(shí)候,寫空間一定是就緒的!
            // 運(yùn)行中可能會(huì)存在條件不滿足 -- 寫空間被寫滿了
            AddConnection(sockfd, EPOLLIN | EPOLLET,
                          std::bind(&TcpServer::TcpRecver, this, std::placeholders::_1),
                          std::bind(&TcpServer::TcpSender, this, std::placeholders::_1),
                          std::bind(&TcpServer::TcpExcepter, this, std::placeholders::_1));
        }
        return 0;
    }

    int TcpRecver(Connection *conn)
    {
        // XXXXXXX\3XXXXXX\3
        while (true)
        {
            char buffer[1024];
            ssize_t s = recv(conn->sock_, buffer, sizeof(buffer) - 1, 0);
            if (s > 0)
            {
                buffer[s] = 0;
                conn->inbuffer_ += buffer;
            }
            else if (s == 0)
            {
                logMessage(DEBUG, "client quit");
                conn->excepter_(conn);
                break;
            }
            else
            {
                if (errno == EINTR)
                    continue;
                else if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else
                {
                    // 出錯(cuò)了
                    logMessage(DEBUG, "recv error: %d:%s", errno, strerror(errno));
                    conn->excepter_(conn);
                    break;
                }
            }
        }
        // 將本輪全部讀取完畢
        std::vector<std::string> result;
        PackageSplit(conn->inbuffer_, &result);
        for (auto &message : result)
        {
            cb_(conn, message);
        }
        return 0;
    }
    int TcpSender(Connection *conn)
    {
        while(true)
        {
            ssize_t n = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);
            if(n > 0)
            {
                // 去除已經(jīng)成功發(fā)送的數(shù)據(jù)
                conn->outbuffer_.erase(0, n);
            }
            else
            {
                if(errno == EINTR) continue;
                else if(errno == EAGAIN || errno == EWOULDBLOCK) break; //發(fā)完了,一定是outbuffer清空了嗎?不一定(EPOLLOUT打開)
                else
                {
                    conn->excepter_(conn);
                    logMessage(DEBUG, "send error: %d:%s", errno, strerror(errno));
                    break;
                }
            }
        }

        return 0;
    }
    int TcpExcepter(Connection *conn)
    {
        // 0.
        if(!IsExists(conn->sock_)) return -1;
        
        // 所有的服務(wù)器異常,都會(huì)被歸類到這里
        // 坑:一定要先從epoll中移除,然后再關(guān)閉fd
        // 1.
        Epoller::DelEvent(epfd_, conn->sock_);
        logMessage(DEBUG, "remove epoll event!");
        // 2.
        close(conn->sock_);
        logMessage(DEBUG, "close fd: %d", conn->sock_);
        // 3. delete conn;
        delete connections_[conn->sock_];
        logMessage(DEBUG, "delete connection object done");
        // 4. 
        connections_.erase(conn->sock_);
        logMessage(DEBUG, "erase connection from connections");

        return 0;
    }
    bool IsExists(int sock)
    {
        auto iter = connections_.find(sock);
        if (iter == connections_.end())
            return false;
        else
            return true;
    }
    // 打開或者關(guān)閉對(duì)于特定socket是否要關(guān)心讀或者寫
    //EnableReadWrite(sock, true, false);
    //EnableReadWrite(sock, true, true);
    void EnableReadWrite(int sock, bool readable, bool writeable)
    {
        uint32_t event = 0;
        event |= (readable ? EPOLLIN : 0);
        event |= (writeable ? EPOLLOUT : 0);
        Epoller::ModEvent(epfd_, sock, event);
    }
    // 根據(jù)就緒事件,將事件進(jìn)行事件派發(fā)
    void Dispatcher()
    {
        int n = Epoller::LoopOnce(epfd_, revs_, revs_num);
        for (int i = 0; i < n; i++)
        {
            int sock = revs_[i].data.fd;
            uint32_t revent = revs_[i].events;
            if(revent & EPOLLHUP) revent |= (EPOLLIN|EPOLLOUT);
            if(revent & EPOLLERR) revent |= (EPOLLIN|EPOLLOUT);

            if (revent & EPOLLIN)
            {
                if (IsExists(sock) && connections_[sock]->recver_)
                    connections_[sock]->recver_(connections_[sock]);
            }
            if (revent & EPOLLOUT)
            {
                if (IsExists(sock) && connections_[sock]->sender_)
                    connections_[sock]->sender_(connections_[sock]);
            }
        }
    }
    void Run()
    {
        while (true)
        {
            Dispatcher();
        }
    }
    ~TcpServer()
    {
        if (listensock_ != -1)
            close(listensock_);
        if (epfd_ != -1)
            close(epfd_);
        delete[] revs_;
    }

private:
    static const int revs_num = 64;
    // 1. 網(wǎng)絡(luò)socket
    int listensock_;
    // 2. epoll
    int epfd_;
    // 3. 將epoll和上層代碼進(jìn)行結(jié)合
    std::unordered_map<int, Connection *> connections_;
    // 4. 就緒事件列表
    struct epoll_event *revs_;
    // 5. 設(shè)置完整報(bào)文的處理方法
    callback_t cb_;
};

Epoller.hpp文件:

#pragma once
#include <iostream>
#include <cerrno>
#include <cstdlib>
#include <unistd.h>
#include <sys/epoll.h>
#include "Log.hpp"

class Epoller
{
public:
    static const int gsize = 128;
public:
    static int CreateEpoller()
    {
        int epfd = epoll_create(gsize);
        if (epfd < 0)
        {
            logMessage(FATAL, "epoll_create : %d : %s", errno, strerror(errno));
            exit(3);
        }
        return epfd;
    }
    static bool AddEvent(int epfd, int sock, uint32_t event)
    {
        struct epoll_event ev;
        ev.events = event;
        ev.data.fd = sock;
        int n = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);
        return n == 0;
    }
    static bool ModEvent(int epfd, int sock, uint32_t event)
    {
        struct epoll_event ev;
        ev.events = event;
        ev.data.fd = sock;
        int n = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ev);
        return n == 0;
    }
    static bool DelEvent(int epfd, int sock)
    {
        int n = epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);
        return n == 0;
    }
    static int LoopOnce(int epfd, struct epoll_event revs[], int num)
    {
        int n = epoll_wait(epfd, revs, num, -1);
        if(n == -1)
        {
            logMessage(FATAL, "epoll_wait : %d : %s", errno, strerror(errno));
        }
        return n;
    }
};

Log.hpp文件:

#pragma once

#include <cstdio>
#include <ctime>
#include <cstdarg>
#include <cassert>
#include <cassert>
#include <cstring>
#include <cerrno>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#define DEBUG 0
#define NOTICE 1
#define WARNING 2
#define FATAL 3

const char *log_level[] = {"DEBUG", "NOTICE", "WARINING", "FATAL"};

#define LOGFILE "serverTcp.log"

class Log
{
public:
    Log():logFd(-1)
    {}
    void enable()
    {
        umask(0);
        logFd = open(LOGFILE, O_WRONLY | O_CREAT | O_APPEND, 0666);
        assert(logFd != -1);
        dup2(logFd, 1);
        dup2(logFd, 2);
    }
    ~Log()
    {
        if(logFd != -1) 
        {
            fsync(logFd);
            close(logFd);
        }
    }
private:
    int logFd;
};

// logMessage(DEBUG, "%d", 10);
void logMessage(int level, const char *format, ...)
{
    assert(level >= DEBUG);
    assert(level <= FATAL);

    char *name = getenv("USER");

    char logInfo[1024];
    va_list ap; // ap -> char*
    va_start(ap, format);

    vsnprintf(logInfo, sizeof(logInfo) - 1, format, ap);

    va_end(ap); // ap = NULL

    // 每次打開太麻煩
    FILE *out = (level == FATAL) ? stderr : stdout;
    fprintf(out, "%s | %u | %s | %s\n",
            log_level[level],
            (unsigned int)time(nullptr),
            name == nullptr ? "unknow" : name,
            logInfo);

    fflush(out); // 將C緩沖區(qū)中的數(shù)據(jù)刷新到OS
    fsync(fileno(out));   // 將OS中的數(shù)據(jù)盡快刷盤

}

Protocol.hpp文件:

#pragma once

#include <iostream>
#include <vector>
#include <cstring>
#include <string>
#include <cstdio>

#define SEP 'X'
#define SEP_LEN sizeof(SEP)

#define CRLF "\r\n"
#define CRLF_LEN strlen(CRLF) // 坑:sizeof(CRLF)
#define SPACE " "
#define SPACE_LEN strlen(SPACE)

// bbbXcc
void PackageSplit(std::string &inbuffer, std::vector<std::string> *result)
{
    while (true)
    {
        std::size_t pos = inbuffer.find(SEP);
        if (pos == std::string::npos)
            break;
        result->push_back(inbuffer.substr(0, pos));
        inbuffer.erase(0, pos + SEP_LEN);
    }
}

struct Request
{
    int x;
    int y;
    char op;
};

struct Response
{
    int code;
    int result;
};

bool Parser(std::string &in, Request *req)
{
    // 1 + 1, 2 * 4, 5 * 9, 6 *1
    std::size_t spaceOne = in.find(SPACE);
    if (std::string::npos == spaceOne)
        return false;
    std::size_t spaceTwo = in.rfind(SPACE);
    if (std::string::npos == spaceTwo)
        return false;

    std::string dataOne = in.substr(0, spaceOne);
    std::string dataTwo = in.substr(spaceTwo + SPACE_LEN);
    std::string oper = in.substr(spaceOne + SPACE_LEN, spaceTwo - (spaceOne + SPACE_LEN));
    if (oper.size() != 1)
        return false;

    // 轉(zhuǎn)成內(nèi)部成員
    req->x = atoi(dataOne.c_str());
    req->y = atoi(dataTwo.c_str());
    req->op = oper[0];

    return true;
}

void Serialize(const Response &resp, std::string *out)
{
    // "exitCode_ result_"
    std::string ec = std::to_string(resp.code);
    std::string res = std::to_string(resp.result);

    *out = ec;
    *out += SPACE;
    *out += res;
    *out += CRLF;
}

Service.hpp文件:

#pragma once

#include "Protocol.hpp"
#include <functional>

using service_t = std::function<Response (const Request &req)>;

static Response calculator(const Request &req)
{
    Response resp = {0, 0};
    switch (req.op)
    {
    case '+':
        resp.result = req.x + req.y;
        break;
    case '-':
        resp.result = req.x - req.y;
        break;
    case '*':
        resp.result = req.x * req.y;
        break;
    case '/':
    { // x_ / y_
        if (req.y == 0)
            resp.code = -1; // -1. 除0
        else
            resp.result = req.x / req.y;
    }
    break;
    case '%':
    { // x_ / y_
        if (req.y == 0)
            resp.code = -2; // -2. 模0
        else
            resp.result = req.x % req.y;
    }
    break;
    default:
        resp.code = -3; // -3: 非法操作符
        break;
    }

    return resp;
}

Sock.hpp文件:

#pragma once

#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <cstdio>
#include <cstring>
#include <signal.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>
#include <cerrno>
#include <cassert>

class Sock
{
public:
    static const int gbacklog = 20;

    static int Socket()
    {
        int listenSock = socket(PF_INET, SOCK_STREAM, 0);
        if (listenSock < 0)
        {
            exit(1);
        }
        int opt = 1;
        setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        return listenSock;
    }
    static void Bind(int socket, uint16_t port)
    {
        struct sockaddr_in local; // 用戶棧
        memset(&local, 0, sizeof local);
        local.sin_family = PF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY;

        // 2.2 本地socket信息,寫入sock_對(duì)應(yīng)的內(nèi)核區(qū)域
        if (bind(socket, (const struct sockaddr *)&local, sizeof local) < 0)
        {
            exit(2);
        }
    }
    static void Listen(int socket)
    {
        if (listen(socket, gbacklog) < 0)
        {
            exit(3);
        }
    }

    static int Accept(int socket, std::string *clientip, uint16_t *clientport)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);

        int serviceSock = accept(socket, (struct sockaddr *)&peer, &len);
        if (serviceSock < 0)
        {
            // 獲取鏈接失敗
            return -1;
        }
        if(clientport) *clientport = ntohs(peer.sin_port);
        if(clientip) *clientip = inet_ntoa(peer.sin_addr);
        return serviceSock;
    }
};

Util.hpp文件:

#pragma once

#include <iostream>
#include <string>
#include <unistd.h>
#include <fcntl.h>

class Util
{
public:
    static void SetNonBlock(int fd)
    {
        int fl = fcntl(fd, F_GETFL);
        fcntl(fd, F_SETFL, fl | O_NONBLOCK);
    }
};

main.cc文件:

#include "TcpServer.hpp"
#include "Service.hpp"
#include <memory>

using namespace std;

static void usage(std::string process)
{
    cerr << "\nUsage: " << process << " port\n"
         << endl;
}
int BeginHandler(Connection *conn, std::string &message, service_t service)
{
    // 我們能保證,message一定是一個(gè)完整的報(bào)文,因?yàn)槲覀円呀?jīng)對(duì)它進(jìn)行了解碼
    Request req;
    // 反序列化,進(jìn)行處理的問(wèn)題
    if (!Parser(message, &req))
    {
        // 寫回錯(cuò)誤消息
        return -1;
        // 可以直接關(guān)閉連接
        // conn->excepter_(conn);
    }
    // 業(yè)務(wù)邏輯
    Response resp = service(req);

    std::cout << req.x << " " << req.op << " " << req.y << std::endl;
    std::cout << resp.code << " " << resp.result << std::endl;

    // 序列化
    std::string sendstr;
    Serialize(resp, &sendstr);

    // 處理完畢的結(jié)果,發(fā)送回給client
    conn->outbuffer_ += sendstr;
    conn->sender_(conn);
    if(conn->outbuffer_.empty()) conn->R_->EnableReadWrite(conn->sock_, true, false);
    else conn->R_->EnableReadWrite(conn->sock_, true, true);

    std::cout << "這里就是上次的業(yè)務(wù)邏輯啦 --- end" << std::endl;

    return 0;
}

// 1 + 1X2 + 3X5 + 6X8 -> 1 + 1
int HandlerRequest(Connection *conn, std::string &message)
{
    return BeginHandler(conn, message, calculator);
}

int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        usage(argv[0]);
        exit(0);
    }
    // http.XXX("GET", "/aaa");
    unique_ptr<TcpServer> svr(new TcpServer(HandlerRequest, atoi(argv[1])));
    svr->Run();

    return 0;
}


Makefile文件:

server:main.cc
	g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
	rm -f server

2.2.epoll?ET服務(wù)器源代碼講解

2.2.1.設(shè)計(jì)思路

epoll ET服務(wù)器:

在epoll ET服務(wù)器中,我們需要處理如下幾種事件:

??讀事件:如果是監(jiān)聽套接字的讀事件就緒則調(diào)用accept函數(shù)獲取底層的連接,如果是其他套接字的讀事件就緒則調(diào)用recv函數(shù)讀取客戶端發(fā)來(lái)的數(shù)據(jù)。
??寫事件:寫事件就緒則將待發(fā)送的數(shù)據(jù)寫入到發(fā)送緩沖區(qū)當(dāng)中。
??異常事件:當(dāng)某個(gè)套接字的異常事件就緒時(shí)我們不做過(guò)多處理,直接關(guān)閉該套接字。
當(dāng)epoll ET服務(wù)器監(jiān)測(cè)到某一事件就緒后,就會(huì)將該事件交給對(duì)應(yīng)的服務(wù)處理程序進(jìn)行處理。

Reactor模式的五個(gè)角色:

在這個(gè)epoll ET服務(wù)器中,Reactor模式中的五個(gè)角色對(duì)應(yīng)如下:

??句柄:文件描述符。
??同步事件分離器:I/O多路復(fù)用epoll。
??事件處理器:包括讀回調(diào)、寫回調(diào)和異?;卣{(diào)。
??具體事件處理器:讀回調(diào)、寫回調(diào)和異常回調(diào)的具體實(shí)現(xiàn)。
??初始分發(fā)器:TcpServer類當(dāng)中的Dispatcher函數(shù)。
Dispatcher函數(shù)要做的就是調(diào)用epoll_wait函數(shù)等待事件發(fā)生,當(dāng)有事件發(fā)生后就將就緒的事件派發(fā)給對(duì)應(yīng)的服務(wù)處理程序即可。

Connection類:

??在Reactor的工作流程中說(shuō)到,在注冊(cè)事件處理器時(shí)需要將其與Handle關(guān)聯(lián),本質(zhì)上就是需要將讀回調(diào)、寫回調(diào)和異?;卣{(diào)與某個(gè)文件描述符關(guān)聯(lián)起來(lái)。
??這樣做的目的就是為了當(dāng)某個(gè)文件描述符上的事件就緒時(shí)可以找到其對(duì)應(yīng)的各種回調(diào)函數(shù),進(jìn)而執(zhí)行對(duì)應(yīng)的回調(diào)方法來(lái)處理該事件。
所以我們可以設(shè)計(jì)一個(gè)Connection類,該類當(dāng)中的成員就包括一個(gè)文件描述符,以及該文件描述符對(duì)應(yīng)的各種回調(diào)函數(shù),此外還有一些其他成員,后面實(shí)現(xiàn)的時(shí)候再做詳細(xì)論述。

TcpServer類:

??在Reactor的工作流程中說(shuō)到,當(dāng)所有事件處理器注冊(cè)完畢后,會(huì)使用同步事件分離器等待這些事件發(fā)生,當(dāng)某個(gè)事件處理器的Handle變?yōu)镽eady狀態(tài)時(shí),同步事件分離器會(huì)通知初始分發(fā)器,然后初始分發(fā)器會(huì)將Ready狀態(tài)的Handle作為key來(lái)尋找其對(duì)應(yīng)的事件處理器,并調(diào)用該事件處理器中對(duì)應(yīng)的回調(diào)方法來(lái)響應(yīng)該事件。
??本質(zhì)就是當(dāng)事件注冊(cè)完畢后,會(huì)調(diào)用epoll_wait函數(shù)來(lái)等待這些事件發(fā)生,當(dāng)某個(gè)事件就緒時(shí)epoll_wait函數(shù)會(huì)告知調(diào)用方,然后調(diào)用方就根據(jù)就緒的文件描述符來(lái)找到其對(duì)應(yīng)的各種回調(diào)函數(shù),并調(diào)用對(duì)應(yīng)的回調(diào)函數(shù)進(jìn)行事件處理。
對(duì)此我們可以設(shè)計(jì)一個(gè)TcpServer類。

??該類當(dāng)中有一個(gè)成員函數(shù)叫做Dispatcher,這個(gè)函數(shù)就是所謂的初始分發(fā)器,在該函數(shù)內(nèi)部會(huì)調(diào)用epoll_wait函數(shù)等待事件的發(fā)生,當(dāng)事件發(fā)生后會(huì)告知Dispatcher已經(jīng)就緒的事件。
??當(dāng)事件就緒后需要根據(jù)就緒的文件描述符來(lái)找到其對(duì)應(yīng)的各種回調(diào)函數(shù),由于我們會(huì)將每個(gè)文件描述符及其對(duì)應(yīng)的各種回調(diào)都封裝到一個(gè)Connection結(jié)構(gòu)當(dāng)中,所以實(shí)際我們就是需要根據(jù)文件描述符找到其對(duì)應(yīng)的Connection結(jié)構(gòu)。
??我們可以使用C++ STL當(dāng)中的unordered_map,來(lái)建立各個(gè)文件描述符與其對(duì)應(yīng)的Connection結(jié)構(gòu)之間的映射,這個(gè)unordered_map可以作為TcpServer類的一個(gè)成員變量,當(dāng)需要找某個(gè)文件描述符的Connection結(jié)構(gòu)時(shí)就可以通過(guò)該成員變量找到。
此外,在TcpServer類當(dāng)中還有一些其他成員,后面實(shí)現(xiàn)的時(shí)候再做詳細(xì)論述。

epoll ET服務(wù)器的工作流程:

??這個(gè)epoll ET服務(wù)器在Reactor模式下的工作流程如下:

首先epoll ET服務(wù)器需要進(jìn)行套接字的創(chuàng)建、綁定和監(jiān)聽。
??然后定義一個(gè)TcpServer對(duì)象并初始化,初始化時(shí)要做的就是創(chuàng)建epoll模型。
??緊接著需要為監(jiān)聽套接字創(chuàng)建對(duì)應(yīng)的Connection結(jié)構(gòu),并調(diào)用TcpServer類中提供的AddConnection函數(shù)將監(jiān)聽套接字添加到epoll模型中,并建立監(jiān)聽套接字與其對(duì)應(yīng)的Connection結(jié)構(gòu)之間的映射關(guān)系。
??之后就可以不斷調(diào)用TcpServer類中的Dispatcher函數(shù)進(jìn)行事件派發(fā)。
在事件處理過(guò)程中,會(huì)不斷向Dispatcher當(dāng)中新增或刪除事件,而每個(gè)事件就緒時(shí)都會(huì)自動(dòng)調(diào)用其對(duì)應(yīng)的回調(diào)函數(shù)進(jìn)行處理,所以我們要做的就是不斷調(diào)用Dispatcher函數(shù)進(jìn)行事件派發(fā)即可。

2.2.2.Connection結(jié)構(gòu)

Connection結(jié)構(gòu)中除了包含文件描述符和其對(duì)應(yīng)的讀回調(diào)、寫回調(diào)和異?;卣{(diào)之外,還包含一個(gè)輸入緩沖區(qū)inbuffer、一個(gè)輸出緩沖區(qū)outbuffer以及一個(gè)回指指針R。

??當(dāng)某個(gè)文件描述符的讀事件就緒時(shí),我們會(huì)調(diào)用recv函數(shù)讀取客戶端發(fā)來(lái)的數(shù)據(jù),但我們并不能保證我們讀取到了一個(gè)完整的報(bào)文,因此需要將讀取到的數(shù)據(jù)暫時(shí)存放到該文件描述符對(duì)應(yīng)的inbuffer當(dāng)中,當(dāng)inbuffer當(dāng)中可以分離出一個(gè)完整的報(bào)文后再將其分離出來(lái)進(jìn)行數(shù)據(jù)處理,這里的inbuffer本質(zhì)就是用來(lái)解決粘包問(wèn)題的。
??當(dāng)處理完一個(gè)報(bào)文請(qǐng)求后,需要將響應(yīng)數(shù)據(jù)發(fā)送給客戶端,但我們并不能保證底層TCP的發(fā)送緩沖區(qū)中有足夠的空間供我們寫入,因此需要將要發(fā)送的數(shù)據(jù)暫時(shí)存放到該文件描述符對(duì)應(yīng)的outbuffer當(dāng)中,當(dāng)?shù)讓覶CP的發(fā)送緩沖區(qū)中有空間,即寫事件就緒時(shí),再依次發(fā)送outbuffer當(dāng)中的數(shù)據(jù)。
??Connection結(jié)構(gòu)當(dāng)中設(shè)置回指指針R,便于快速找到我們定義的TcpServer對(duì)象,因?yàn)楹罄m(xù)我們需要根據(jù)Connection結(jié)構(gòu)找到這個(gè)TcpServer對(duì)象。比如當(dāng)連接事件就緒時(shí),需要調(diào)用TcpServer類當(dāng)中的AddEvent函數(shù)將其添加到Dispatcher當(dāng)中。
此外,Connection結(jié)構(gòu)當(dāng)中需要提供管理回調(diào)的成員函數(shù),便于外部對(duì)Connection結(jié)構(gòu)當(dāng)中的各種回調(diào)進(jìn)行設(shè)置。

2.2.3.TcpServer類

在TcpServer類當(dāng)中有一個(gè)unordered_map成員,用于建立文件描述符和與其對(duì)應(yīng)的Connection結(jié)構(gòu)之間的映射,還有一個(gè)epfd成員,該成員是epoll模型對(duì)應(yīng)的文件描述符。

在初始化TcpServer對(duì)象的時(shí)候就可以調(diào)用epoll_create函數(shù)創(chuàng)建epoll模型,并將該epoll模型對(duì)應(yīng)的文件描述符用epfd成員記錄下來(lái),便于后續(xù)使用。
Reactor對(duì)象在析構(gòu)的時(shí)候,需要調(diào)用close函數(shù)將該epoll模型進(jìn)行關(guān)閉。

Dispatcher函數(shù)(事件分派器):
TcpServer類當(dāng)中的Dispatcher函數(shù)就是之前所說(shuō)的初始分發(fā)器,這里我們更形象的將其稱之為事件分派器。

??事件分派器要做的就是調(diào)用epoll_wait函數(shù)等待事件發(fā)生。
??當(dāng)某個(gè)文件描述符上的事件發(fā)生后,先通過(guò)unordered_map找到該文件描述符對(duì)應(yīng)的Connection結(jié)構(gòu),然后調(diào)用Connection結(jié)構(gòu)當(dāng)中對(duì)應(yīng)的回調(diào)函數(shù)對(duì)該事件進(jìn)行處理即可。

說(shuō)明一下:

??這里沒(méi)有用switch或if語(yǔ)句對(duì)epoll_wait函數(shù)的返回值進(jìn)行判斷,而是借用for循環(huán)對(duì)其返回值進(jìn)行了判斷。
??如果epoll_wait的返回值為-1則說(shuō)明epoll_wait函數(shù)調(diào)用失敗,此時(shí)不會(huì)進(jìn)入到for循環(huán)內(nèi)部進(jìn)行事件處理。
??如果epoll_wait的返回值為0則說(shuō)明epoll_wait函數(shù)超時(shí)返回,此時(shí)也不會(huì)進(jìn)入到for循環(huán)內(nèi)部進(jìn)行事件處理。
??如果epoll_wait的返回值大于0則說(shuō)明epoll_wait函數(shù)調(diào)用成功,此時(shí)才會(huì)進(jìn)入到for循環(huán)內(nèi)部調(diào)用對(duì)應(yīng)的回調(diào)函數(shù)對(duì)事件進(jìn)行處理。
??事件處理時(shí)最好先對(duì)異常事件進(jìn)行處理,因此代碼中將異常事件的判斷放在了最前面。

AddConnection函數(shù):

TcpServer類當(dāng)中的AddConnection函數(shù)是用于進(jìn)行事件注冊(cè)的。

??在注冊(cè)事件時(shí)需要傳入一個(gè)文件描述符和一個(gè)事件集合,表示需要監(jiān)視哪個(gè)文件描述符上的哪些事件。
??還需要傳入該文件描述符對(duì)應(yīng)的Connection結(jié)構(gòu),表示當(dāng)該文件描述符上的事件就緒后應(yīng)該執(zhí)行的回調(diào)方法。
??在AddEvent函數(shù)內(nèi)部要做的就是,調(diào)用epoll_ctl函數(shù)將該文件描述符及其對(duì)應(yīng)的事件集合注冊(cè)到epoll模型當(dāng)中,然后建立該文件描述符與其對(duì)應(yīng)的EventItem結(jié)構(gòu)的映射關(guān)系。

EnableReadWrite函數(shù):

Reactor類當(dāng)中的EnableReadWrite函數(shù),用于使能或使能某個(gè)文件描述符的讀寫事件。

??調(diào)用EnableReadWrite函數(shù)時(shí)需要傳入一個(gè)文件描述符,表示需要設(shè)置的是哪個(gè)文件描述符對(duì)應(yīng)的事件。
??還需要傳入兩個(gè)bool值,分別表示需要使能還是使能讀寫事件。
??EnableReadWrite函數(shù)內(nèi)部會(huì)調(diào)用epoll_ctl函數(shù)修改將該文件描述符的監(jiān)聽事件。

2.2.4.回調(diào)函數(shù)

下面我們就可以實(shí)現(xiàn)一些回調(diào)函數(shù),這里主要實(shí)現(xiàn)四個(gè)回調(diào)函數(shù)。

??Accepter:當(dāng)連接事件到來(lái)時(shí)可以調(diào)用該回調(diào)函數(shù)獲取底層建立好的連接。
??TcpRecver:當(dāng)讀事件就緒時(shí)可以調(diào)用該回調(diào)函數(shù)讀取客戶端發(fā)來(lái)的數(shù)據(jù)并進(jìn)行處理。
??TcpSender:當(dāng)寫事件就緒時(shí)可以調(diào)用該回調(diào)函數(shù)向客戶端發(fā)送響應(yīng)數(shù)據(jù)。
??TcpExcepter:當(dāng)異常事件就緒時(shí)可以調(diào)用該函數(shù)將對(duì)應(yīng)的文件描述符進(jìn)行關(guān)閉。
當(dāng)我們?yōu)槟硞€(gè)文件描述符創(chuàng)建Connection結(jié)構(gòu)時(shí),就可以調(diào)用Connection類提供的回調(diào)設(shè)置函數(shù)(SetRecver、SetSender、SetExcepter),將這些回調(diào)函數(shù)到Connection結(jié)構(gòu)當(dāng)中。

??我們會(huì)將監(jiān)聽套接字對(duì)應(yīng)的Connection結(jié)構(gòu)當(dāng)中的recver_設(shè)置為Accepter,因?yàn)楸O(jiān)聽套接字的讀事件就緒就意味著連接事件就緒了,而監(jiān)聽套接字一般只關(guān)心讀事件,因此監(jiān)聽套接字對(duì)應(yīng)的sender_和excepter_可以設(shè)置為nullptr。
??當(dāng)Dispatcher監(jiān)測(cè)到監(jiān)聽套接字的讀事件就緒時(shí),會(huì)調(diào)用監(jiān)聽套接字對(duì)應(yīng)的Connection結(jié)構(gòu)當(dāng)中的recver_回調(diào),此時(shí)就會(huì)調(diào)用Accepter回調(diào)獲取底層建立好的連接。
??而對(duì)于與客戶端建立連接的套接字,我們會(huì)將其對(duì)應(yīng)的Connection結(jié)構(gòu)當(dāng)中的recver_、sender_和excepter_分別設(shè)置為這里的TcpRecver、TcpSender和TcpExcepter。
??當(dāng)Dispatcher監(jiān)測(cè)到這些套接字的事件就緒時(shí),就會(huì)調(diào)用其對(duì)應(yīng)的Connection結(jié)構(gòu)當(dāng)中對(duì)應(yīng)的回調(diào)函數(shù),也就是這里的TcpRecver、TcpSender和TcpExcepter。

Accepter回調(diào):

Accepter回調(diào)用于處理連接事件,其工作流程如下:

1.調(diào)用accept函數(shù)獲取底層建立好的連接。
2.將獲取到的套接字設(shè)置為非阻塞,并為其創(chuàng)建Connection結(jié)構(gòu),填充Connection結(jié)構(gòu)當(dāng)中的各個(gè)字段,并注冊(cè)該套接字相關(guān)的回調(diào)方法。
3.將該套接字及其對(duì)應(yīng)需要關(guān)心的事件注冊(cè)到Dispatcher當(dāng)中。
下一次Dispatcher在進(jìn)行事件派發(fā)時(shí)就會(huì)幫我們關(guān)注該套接字對(duì)應(yīng)的事件,當(dāng)事件就緒時(shí)就會(huì)執(zhí)行該套接字對(duì)應(yīng)的Connection結(jié)構(gòu)中對(duì)應(yīng)的回調(diào)方法。

需要注意的是,因?yàn)檫@里實(shí)現(xiàn)的ET模式下的epoll服務(wù)器,因此在獲取底層連接時(shí)需要循環(huán)調(diào)用accept函數(shù)進(jìn)行讀取,并且監(jiān)聽套接字必須設(shè)置為非阻塞。

??因?yàn)镋T模式下只有當(dāng)?shù)讓咏⒌倪B接從無(wú)到有或是從有到多時(shí)才會(huì)通知上層,如果沒(méi)有一次性將底層建立好的連接全部獲取,并且此后再也沒(méi)有建立好的連接,那么底層沒(méi)有讀取完的連接就相當(dāng)于丟失了,所以需要循環(huán)多次調(diào)用accept函數(shù)獲取底層建立好的連接。
??循環(huán)調(diào)用accept函數(shù)也就意味著,當(dāng)?shù)讓舆B接全部被獲取后再調(diào)用accept函數(shù),此時(shí)就會(huì)因?yàn)榈讓右呀?jīng)沒(méi)有連接了而被阻塞住,因此需要將監(jiān)聽套接字設(shè)置為非阻塞,這樣當(dāng)?shù)讓記](méi)有連接時(shí)accept就會(huì)返回,而不會(huì)被阻塞住。
accept獲取到的新的套接字也需要設(shè)置為非阻塞,就是為了避免將來(lái)循環(huán)調(diào)用recv、send等函數(shù)時(shí)被阻塞。

設(shè)置文件描述符為非阻塞:

設(shè)置文件描述符為非阻塞時(shí),需要先調(diào)用fcntl函數(shù)獲取該文件描述符對(duì)應(yīng)的文件狀態(tài)標(biāo)記,然后在該文件狀態(tài)標(biāo)記的基礎(chǔ)上添加非阻塞標(biāo)記O_NONBLOCK,最后調(diào)用fcntl函數(shù)對(duì)該文件描述符的狀態(tài)標(biāo)記進(jìn)行設(shè)置即可。

監(jiān)聽套接字設(shè)置為非阻塞后,當(dāng)?shù)讓舆B接不就緒時(shí),accept函數(shù)會(huì)以出錯(cuò)的形式返回,因此當(dāng)調(diào)用accept函數(shù)的返回值小于0時(shí),需要繼續(xù)判斷錯(cuò)誤碼。

??如果錯(cuò)誤碼為EAGAIN或EWOULDBLOCK,說(shuō)明本次出錯(cuò)返回是因?yàn)榈讓右呀?jīng)沒(méi)有可獲取的連接了,此時(shí)底層連接全部獲取完畢,這時(shí)我們可以返回0,表示本次Accepter調(diào)用成功。
??如果錯(cuò)誤碼為EINTR,說(shuō)明本次調(diào)用accept函數(shù)獲取底層連接時(shí)被信號(hào)中斷了,這時(shí)還應(yīng)該繼續(xù)調(diào)用accept函數(shù)進(jìn)行獲取。
??除此之外,才說(shuō)明accept函數(shù)是真正調(diào)用失敗了,這時(shí)我們可以返回-1,表示本次Accepter調(diào)用失敗。

問(wèn)題:accept、recv和send等IO系統(tǒng)調(diào)用為什么會(huì)被信號(hào)中斷?

答:IO系統(tǒng)調(diào)用函數(shù)出錯(cuò)返回并且將錯(cuò)誤碼設(shè)置為EINTR,表明本次在進(jìn)行數(shù)據(jù)讀取或數(shù)據(jù)寫入之前被信號(hào)中斷了,也就是說(shuō)IO系統(tǒng)調(diào)用在陷入內(nèi)核,但并沒(méi)有返回用戶態(tài)的時(shí)候內(nèi)核跑去處理其他信號(hào)了。

??在內(nèi)核態(tài)返回用戶態(tài)之前會(huì)檢查信號(hào)的pending位圖,也就是未決信號(hào)集,如果pending位圖中有未處理的信號(hào),那么內(nèi)核就會(huì)對(duì)該信號(hào)進(jìn)行處理。
??但I(xiàn)O系統(tǒng)調(diào)用函數(shù)在進(jìn)行IO操作之前就被信號(hào)中斷了,這實(shí)際上是一個(gè)特例,因?yàn)镮O過(guò)程分為“等”和“拷貝”兩個(gè)步驟,而一般“等”的過(guò)程比較漫長(zhǎng),而在這個(gè)過(guò)程中我們的執(zhí)行流其實(shí)是處于閑置的狀態(tài)的,因此在“等”的過(guò)程中如果有信號(hào)產(chǎn)生,內(nèi)核就會(huì)立即進(jìn)行信號(hào)的處理。

寫事件是按需打開的:

這里調(diào)用accept獲取上來(lái)的套接字在添加到Dispatcher中時(shí),只添加了EOPLLIN和EPOLLET事件,也就是說(shuō)只讓epoll幫我們關(guān)心該套接字的讀事件。

??這里之所以沒(méi)有添加寫事件,是因?yàn)楫?dāng)前我們并沒(méi)有要發(fā)送的數(shù)據(jù),因此沒(méi)有必要讓epoll幫我們關(guān)心寫事件。
??一般讀事件是經(jīng)常會(huì)被設(shè)置的,而寫事件則是按序打開的,只有當(dāng)我們有數(shù)據(jù)要發(fā)送時(shí)才會(huì)將寫事件打開,并且在數(shù)據(jù)全部寫入完畢后又會(huì)立即將寫事件關(guān)閉。

注:

1.如果LT模式,一定是要先檢測(cè)有沒(méi)有對(duì)應(yīng)的空間(先打開對(duì)寫事件的關(guān)心,epoll會(huì)自動(dòng)進(jìn)行事件派發(fā)),然后才寫入。LT模式下,只要你打開了寫入,我們的代碼會(huì)自動(dòng)進(jìn)行調(diào)用TcpSender方法,進(jìn)行發(fā)送。
2.如果是ET模式,也可以采用上面的方法。不過(guò),一般ET我們追求高效,直接發(fā)送。通過(guò)發(fā)送是否全部發(fā)送完成,來(lái)決定是否要進(jìn)行打開對(duì)寫事件進(jìn)行關(guān)心:

a.先調(diào)用send函數(shù)發(fā)送,發(fā)完就完了。
b.如果沒(méi)有send發(fā)完,打開寫事件關(guān)心,讓epoll自動(dòng)幫我們進(jìn)行發(fā)送。
一般寫事件關(guān)心,不能常打開,一定是需要的時(shí)候,在進(jìn)行打開)不需要就要關(guān)閉對(duì)寫事件的關(guān)心。

TcpRecver回調(diào):

TcpRecver回調(diào)用于處理讀事件,其工作流程如下:

1.循環(huán)調(diào)用recv函數(shù)讀取數(shù)據(jù),并將讀取到的數(shù)據(jù)添加到該套接字對(duì)應(yīng)EventItem結(jié)構(gòu)的inbuffer當(dāng)中。
2.對(duì)inbuffer當(dāng)中的數(shù)據(jù)進(jìn)行切割,將完整的報(bào)文切割出來(lái),剩余的留在inbuffer當(dāng)中。
3.對(duì)切割出來(lái)的完整報(bào)文進(jìn)行反序列化。
4.業(yè)務(wù)處理。
5.業(yè)務(wù)處理后形成響應(yīng)報(bào)文。
6.將響應(yīng)報(bào)頭添加到對(duì)應(yīng)EventItem結(jié)構(gòu)的outbuffer當(dāng)中,并打開寫事件。
下一次Dispatcher在進(jìn)行事件派發(fā)時(shí)就會(huì)幫我們關(guān)注該套接字的寫事件,當(dāng)寫事件就緒時(shí)就會(huì)執(zhí)行該套接字對(duì)應(yīng)的EventItem結(jié)構(gòu)中寫回調(diào)方法,進(jìn)而將outbuffer中的響應(yīng)數(shù)據(jù)發(fā)送給客戶端。

數(shù)據(jù)讀?。?/strong>

??循環(huán)調(diào)用recv函數(shù)將讀取到的數(shù)據(jù)添加到inbuffer當(dāng)中。

??當(dāng)recv函數(shù)的返回值小于0時(shí)同樣需要進(jìn)一步判斷錯(cuò)誤碼,如果錯(cuò)誤碼為EAGAIN或EWOULDBLOCK則說(shuō)明底層數(shù)據(jù)讀取完畢了,如果錯(cuò)誤碼為EINTR則說(shuō)明讀取過(guò)程被信號(hào)中斷了,此時(shí)還需要繼續(xù)調(diào)用recv函數(shù)進(jìn)行讀取,否則就是讀取出錯(cuò)了。
??當(dāng)讀取出錯(cuò)時(shí)直接調(diào)用該套接字對(duì)應(yīng)的excepter_回調(diào),最終就會(huì)調(diào)用到下面將要實(shí)現(xiàn)的TcpExcepter回調(diào),在我們會(huì)在TcpExcepter回調(diào)當(dāng)中將該套接字進(jìn)行關(guān)閉。

報(bào)文切割:

報(bào)文切割本質(zhì)就是為了防止粘包問(wèn)題,而粘包問(wèn)題實(shí)際是涉及到協(xié)議定制的。

??因?yàn)槲覀冃枰鶕?jù)協(xié)議知道如何將各個(gè)報(bào)文進(jìn)行分離,比如UDP分離報(bào)文采用的就是定長(zhǎng)報(bào)頭+自描述字段。
??我們的目的是演示整個(gè)數(shù)據(jù)處理的過(guò)程,為了簡(jiǎn)單起見(jiàn)就不進(jìn)行過(guò)于復(fù)雜的協(xié)議定制了,這里我們就以“X”作為各個(gè)報(bào)文之間的分隔符,每個(gè)報(bào)文的最后都會(huì)以一個(gè)“X”作為報(bào)文結(jié)束的標(biāo)志。
??因此現(xiàn)在要做的就是以“X”作為分隔符對(duì)inbuffer當(dāng)中的字符串進(jìn)行切割,這里將這個(gè)過(guò)程封裝成一個(gè)PackageSplit函數(shù)。
??PackageSplit函數(shù)要做的就是對(duì)inbuffer當(dāng)中的字符串進(jìn)行切割,將切割出來(lái)的一個(gè)個(gè)報(bào)文放到vector當(dāng)中,對(duì)于最后無(wú)法切出完整報(bào)文的數(shù)據(jù)就留在inbuffer當(dāng)中即可。

反序列化:

在數(shù)據(jù)發(fā)送之前需要進(jìn)行序列化Serialize,接收到數(shù)據(jù)之后需要對(duì)數(shù)據(jù)進(jìn)行反序列化Parser。

??序列化就是將對(duì)象的狀態(tài)信息轉(zhuǎn)換為可以存儲(chǔ)或傳輸?shù)男问剑ㄗ止?jié)序列)的過(guò)程。
??反序列化就是把字節(jié)序列恢復(fù)為原對(duì)象的過(guò)程。
實(shí)際反序列化也是與協(xié)議定制相關(guān)的,假設(shè)這里的epoll服務(wù)器向客戶端提供的就是計(jì)算服務(wù),客戶端向服務(wù)器發(fā)來(lái)的都是需要服務(wù)器計(jì)算的計(jì)算表達(dá)式,因此可以用一個(gè)結(jié)構(gòu)體來(lái)描述這樣一個(gè)計(jì)算表達(dá)式,結(jié)構(gòu)體當(dāng)中包含兩個(gè)操作數(shù)x和y,以及一個(gè)操作符op。

此時(shí)這里所謂的反序列化就是將一個(gè)計(jì)算表達(dá)式轉(zhuǎn)換成這樣一個(gè)結(jié)構(gòu)體,

??因此現(xiàn)在要做的就是將形如“1 + 2”這樣的計(jì)算表達(dá)式轉(zhuǎn)換成一個(gè)結(jié)構(gòu)體,該結(jié)構(gòu)體當(dāng)中的x成員的值就是1,y的值就是2,op的值就是‘+’,這里將這個(gè)過(guò)程封裝成一個(gè)Parser函數(shù)。

說(shuō)明一下:?實(shí)際在做項(xiàng)目時(shí)不需要我們自己進(jìn)行序列化和反序列化,我們一般會(huì)直接用JSON或XML這樣的序列化反序列化工具。

業(yè)務(wù)處理:

業(yè)務(wù)處理就是服務(wù)器拿到客戶端發(fā)來(lái)的數(shù)據(jù)后,對(duì)數(shù)據(jù)進(jìn)行數(shù)據(jù)分析,最終拿到客戶端想要的資源。

??我們這里要做的業(yè)務(wù)處理非常簡(jiǎn)單,就是用反序列化后的數(shù)據(jù)進(jìn)行數(shù)據(jù)計(jì)算,此時(shí)得到的計(jì)算結(jié)果就是客戶端想要的。

形成響應(yīng)報(bào)文:

在業(yè)務(wù)處理后我們已經(jīng)拿到了客戶端想要的數(shù)據(jù),現(xiàn)在我們要做的就是形成響應(yīng)報(bào)文,由于我們這里規(guī)定每個(gè)報(bào)文都以“X”作為報(bào)文結(jié)束的標(biāo)志,因此在形成響應(yīng)報(bào)文的時(shí)候,就需要在每一個(gè)計(jì)算結(jié)果后面都添加上一個(gè)“X”,表示這是之前某一個(gè)請(qǐng)求報(bào)文的響應(yīng)報(bào)文,因?yàn)閰f(xié)議制定后就需要雙方遵守。

將響應(yīng)報(bào)文添加到outbuffer中:

響應(yīng)報(bào)文構(gòu)建完后需要將其添加到該套接字對(duì)應(yīng)的outbuffer中,并打開該套接字的寫事件,此后當(dāng)寫事件就緒時(shí)就會(huì)將outbuffer當(dāng)中的數(shù)據(jù)發(fā)送出去。

TcpSender回調(diào):

TcpSender回調(diào)用于處理寫事件,其工作流程如下:

1.循環(huán)調(diào)用send函數(shù)發(fā)送數(shù)據(jù),并將發(fā)送出去的數(shù)據(jù)從該套接字對(duì)應(yīng)Connection結(jié)構(gòu)的outbuffer中刪除。
2.如果循環(huán)調(diào)用send函數(shù)后該套接字對(duì)應(yīng)的outbuffer當(dāng)中的數(shù)據(jù)被全部發(fā)送,此時(shí)就需要將該套接字對(duì)應(yīng)的寫事件關(guān)閉,因?yàn)橐呀?jīng)沒(méi)有要發(fā)送的數(shù)據(jù)了,如果outbuffer當(dāng)中的數(shù)據(jù)還有剩余,那么該套接字對(duì)應(yīng)的寫事件就應(yīng)該繼續(xù)打開。

循環(huán)調(diào)用send函數(shù)發(fā)送數(shù)據(jù)的過(guò)程:

??循環(huán)調(diào)用send函數(shù)將outbuffer中的數(shù)據(jù)發(fā)送出去。
??當(dāng)send函數(shù)的返回值小于0時(shí)也需要進(jìn)一步判斷錯(cuò)誤碼,如果錯(cuò)誤碼為EAGAIN或EWOULDBLOCK則說(shuō)明底層TCP發(fā)送緩沖區(qū)已經(jīng)被寫滿了,這時(shí)需要將已經(jīng)發(fā)送的數(shù)據(jù)從outbuffer中移除。
??如果錯(cuò)誤碼為EINTR則說(shuō)明發(fā)送過(guò)程被信號(hào)中斷了,此時(shí)還需要繼續(xù)調(diào)用send函數(shù)進(jìn)行發(fā)送,否則就是發(fā)送出錯(cuò)了。
??當(dāng)發(fā)送出錯(cuò)時(shí)也直接調(diào)用該套接字對(duì)應(yīng)的excepter_回調(diào),最終就會(huì)調(diào)用到下面將要實(shí)現(xiàn)的TcpExcepter回調(diào),在我們會(huì)在TcpExcepter回調(diào)當(dāng)中將該套接字進(jìn)行關(guān)閉。
??如果最終outbuffer當(dāng)中的數(shù)據(jù)全部發(fā)送成功,則將outbuffer清空即可。

TcpExcepter回調(diào):

errorer回調(diào)用于處理異常事件。

??對(duì)于異常事件就緒的套接字我們這里不做其他過(guò)多的處理,簡(jiǎn)單的調(diào)用close函數(shù)將該套接字關(guān)閉即可。
??但是在關(guān)閉該套接字之前,需要先調(diào)用DelEvent函數(shù)將該套接字從epoll模型中刪除,并取消該套接字與其對(duì)應(yīng)的Connection結(jié)構(gòu)的映射關(guān)系。
??由于在Dispatcher當(dāng)中是先處理的異常事件,為了避免該套接字被關(guān)閉后繼續(xù)進(jìn)行讀寫操作,然后因?yàn)樽x寫操作失敗再次調(diào)用errorer回調(diào)重復(fù)關(guān)閉該文件描述符,因此在關(guān)閉該套接字后將其Connection當(dāng)中的文件描述符值設(shè)置為-1。
??在調(diào)用TcpRecver和TcpSender回調(diào)執(zhí)行讀寫操作之前,都會(huì)判斷該Connection結(jié)構(gòu)當(dāng)中的文件描述符值是否有效,如果無(wú)效則不會(huì)進(jìn)行后續(xù)操作。

2.2.5.套接字相關(guān)

這里可以編寫一個(gè)Socket類,對(duì)套接字相關(guān)的接口進(jìn)行一定程度的封裝,為了讓外部能夠直接調(diào)用Socket類當(dāng)中封裝的函數(shù),于是將這些函數(shù)定義成了靜態(tài)成員函數(shù)。

2.2.6.Reactor模式和Proactor模式

??Reactor:半異步半同步,Reactor (tcp)服務(wù)器是Linux服務(wù)最常用的(幾乎沒(méi)有之一),既負(fù)責(zé)事件派發(fā),又負(fù)責(zé)IO(負(fù)責(zé)業(yè)務(wù)邏輯的處理)
??Proactor:前攝模式,其他平臺(tái)可能出現(xiàn)的模式。只負(fù)責(zé)負(fù)責(zé)事件派發(fā),就緒的事件推送給后端的進(jìn)程、線程池,不關(guān)心負(fù)責(zé)IO(不負(fù)責(zé)業(yè)務(wù)邏輯的處理)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-481182.html

到了這里,關(guān)于Linux - 第25節(jié) - Linux高級(jí)IO(三)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 服務(wù)器IO復(fù)用reactor模式

    調(diào)試: Linux下nc命令作為客戶端: nc 127.0.0.1 7777

    2024年02月10日
    瀏覽(17)
  • 【Linux】高級(jí)IO

    【Linux】高級(jí)IO

    目錄 IO的基本概念 釣魚五人組 五種IO模型 高級(jí)IO重要概念 同步通信 VS?異步通信 阻塞 VS 非阻塞 其他高級(jí)IO 阻塞IO 非阻塞IO 什么是IO? I/O(input/output)也就是輸入和輸出,在著名的馮諾依曼體系結(jié)構(gòu)當(dāng)中,將數(shù)據(jù)從輸入設(shè)備拷貝到內(nèi)存就叫做輸入,將數(shù)據(jù)從內(nèi)存拷貝到輸出設(shè)

    2024年02月13日
    瀏覽(20)
  • Linux 高級(jí)IO

    Linux 高級(jí)IO

    小編是雙非本科大二菜鳥不贅述,歡迎米娜桑來(lái)指點(diǎn)江山哦 1319365055 ????非科班轉(zhuǎn)碼社區(qū)誠(chéng)邀您入駐???? 小伙伴們,滿懷希望,所向披靡,打碼一路向北 一個(gè)人的單打獨(dú)斗不如一群人的砥礪前行 這是和夢(mèng)想合伙人組建的社區(qū),誠(chéng)邀各位有志之士的加入??! 社區(qū)用戶好文

    2024年02月08日
    瀏覽(17)
  • 高級(jí)IO(Linux)

    高級(jí)IO(Linux)

    什么是IO? 就拿 read 系統(tǒng)調(diào)用來(lái)說(shuō),從緩沖區(qū)中讀取數(shù)據(jù);首先要保證緩沖區(qū)有數(shù)據(jù),若沒(méi)有,操作就會(huì)被阻塞,也就是等待資源就緒;若有,將數(shù)據(jù)拷貝完之后直接返回;所以,IO分為兩部分:等待資源就緒+拷貝數(shù)據(jù);其中等待資源就緒在整個(gè)IO過(guò)程中占比非常大,如何降

    2024年02月08日
    瀏覽(36)
  • Linux中的高級(jí)IO

    Linux中的高級(jí)IO

    1.IO 1.1基本介紹 IO實(shí)際上就是 input output 在馮諾依曼體系中就是與外設(shè)交互的意思,而我們的網(wǎng)絡(luò)通信本質(zhì)上也是一種IO。 1.2基礎(chǔ)io的低效性 為什么基礎(chǔ)io會(huì)低效呢?我們以讀取為例來(lái)介紹。 當(dāng)我們底層調(diào)用read函數(shù)的時(shí)候,如果緩沖區(qū)沒(méi)有數(shù)據(jù) ,我們就會(huì)將pcb放入等待隊(duì)列,

    2024年02月05日
    瀏覽(16)
  • IO進(jìn)程線程,文件與目錄,實(shí)現(xiàn)linux任意目錄下ls -la

    注意文件的名字、路徑是如何輸入的。 函數(shù)opendir打開目錄,struct dirent,struct stat這些結(jié)構(gòu)體的含義。? ????????readdir()函數(shù)是一個(gè)用于讀取目錄內(nèi)容的系統(tǒng)調(diào)用或庫(kù)函數(shù),在類Unix操作系統(tǒng)中(如Linux)廣泛使用。它用于遍歷目錄,并逐個(gè)獲取目錄中的條目(文件和子目錄

    2024年02月10日
    瀏覽(23)
  • 嵌入式Linux學(xué)習(xí)DAY21--目錄io

    嵌入式Linux學(xué)習(xí)DAY21--目錄io

    對(duì)主函數(shù)傳參: ./a.out +參數(shù)1(指針數(shù)組),參數(shù)2....... 在代碼中,要寫成?int?main(參數(shù)數(shù)量,const char *指針數(shù)組) lseek(a,b,c): ? ? ? ? ? ? ? ?功能:重新設(shè)定文件描述符的偏移量 ? ? ? ? ? ? ? ?參數(shù):a:文件描述符 ? ? ? ? ? ? ? ? ? ? ? ? ? b:偏移量 ????????

    2024年02月20日
    瀏覽(26)
  • Linux學(xué)習(xí)記錄——?? 高級(jí)IO(1)

    Linux學(xué)習(xí)記錄——?? 高級(jí)IO(1)

    其它IO類型的實(shí)現(xiàn)在這篇之后的三篇 input,output。調(diào)用read或recv接口時(shí),如果對(duì)方長(zhǎng)時(shí)間不向我方接收緩沖區(qū)拷貝數(shù)據(jù),我們的進(jìn)程就只能阻塞,這是讀取條件不滿足。阻塞的時(shí)間成本最后會(huì)體現(xiàn)在用戶上。因此可以說(shuō),IO = 等 + 數(shù)據(jù)拷貝。高效IO則是單位事件內(nèi),等的比重越

    2024年01月21日
    瀏覽(23)
  • 【Linux】高級(jí)IO --- 多路轉(zhuǎn)接,select,poll,epoll

    【Linux】高級(jí)IO --- 多路轉(zhuǎn)接,select,poll,epoll

    所有通過(guò)捷徑所獲取的快樂(lè),無(wú)論是金錢、性還是名望,最終都會(huì)給自己帶來(lái)痛苦 1. 后端服務(wù)器最常用的網(wǎng)絡(luò)IO設(shè)計(jì)模式其實(shí)就是Reactor,也稱為反應(yīng)堆模式,Reactor是單進(jìn)程,單線程的,但他能夠處理多客戶端向服務(wù)器發(fā)起的網(wǎng)絡(luò)IO請(qǐng)求,正因?yàn)樗菃螆?zhí)行流,所以他的成本就

    2024年02月09日
    瀏覽(26)
  • 【文件IO】Linux 文件操作(一) —— 遍歷指定目錄下的所有文件

    【文件IO】Linux 文件操作(一) —— 遍歷指定目錄下的所有文件

    目錄 一、訪問(wèn)目錄相關(guān)函數(shù) 1、打開/訪問(wèn)目錄 (opendir / fdopendir) 2、讀取目錄內(nèi)容 (readdir) 3、關(guān)閉目錄 (closedir) 二、遍歷指定目錄下的所有文件 opendir / fdopendir 函數(shù)的作用是訪問(wèn)指定路徑的目錄,函數(shù)聲明如下: (1) opendir opendir 函數(shù)是通過(guò)用戶提供的目錄路徑來(lái)訪問(wèn)目錄, 參

    2024年02月04日
    瀏覽(23)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包