国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

服務器IO復用reactor模式

這篇具有很好參考價值的文章主要介紹了服務器IO復用reactor模式。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unistd.h>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <functional>
#include <future>
#include <algorithm>

const int MAX_CLIENTS = 10;
const int BUFFER_SIZE = 1024;
const int MAX_THREADS = 4;

struct EventData
{
    int clientSocket;
};

class ThreadPool
{
public:
    ThreadPool(size_t numThreads)
    {
        for (size_t i = 0; i < numThreads; ++i)
        {
            threads_.emplace_back([this]
                                  {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(mutex_);
                        condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });

                        if (stop_ && tasks_.empty()) {
                            return;
                        }

                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }
                    //std::cout << "task(); " << std::endl;
                    task();
                } });
        }
    }

    ~ThreadPool()
    {
        {
            std::unique_lock<std::mutex> lock(mutex_);
            stop_ = true;
        }
        condition_.notify_all();

        for (std::thread &thread : threads_)
        {
            thread.join();
        }
    }

    void Enqueue(std::function<void()> func)
    {
        {
            std::unique_lock<std::mutex> lock(mutex_);
            tasks_.emplace(std::move(func));
        }
        condition_.notify_one();
    }

private:
    std::vector<std::thread> threads_;
    std::queue<std::function<void()>> tasks_;

    std::mutex mutex_;
    std::condition_variable condition_;
    bool stop_ = false;
};

class Reactor
{
public:
    Reactor(ThreadPool &threadPool) : threadPool_(threadPool)
    {
    }

    void Register(int clientSocket)
    {
        std::cout << "Register " << std::endl;
        //std::lock_guard<std::mutex> lock(mutex_);
        std::cout << "Register2 " << std::endl;
        clientSockets_.push_back(clientSocket);
        std::cout << "Socket " << clientSocket << " registered." << std::endl;
    }

    void Remove(int clientSocket)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        clientSockets_.erase(std::remove_if(clientSockets_.begin(), clientSockets_.end(),
                                            [clientSocket](int socket)
                                            { return socket == clientSocket; }),
                             clientSockets_.end());
        close(clientSocket);
        FD_CLR(clientSocket, &readFds);
        std::cout << "Socket " << clientSocket << " removed." << std::endl;
        clientSocket = 0;
    }

    void Run(int mainSocket)
    {
        int maxFd = mainSocket;

        while (true)
        {
            FD_ZERO(&readFds);
            FD_SET(mainSocket, &readFds);

            {
                std::unique_lock<std::mutex> lock(mutex_);

                for (int socket : clientSockets_)
                {
                    if (socket > 0)
                    {
                        FD_SET(socket, &readFds);
                        maxFd = std::max(maxFd, socket);
                    }
                }

                struct timeval timeout;
                timeout.tv_sec = 0;
                timeout.tv_usec = 0.01;

                int selectResult = select(maxFd + 1, &readFds, nullptr, nullptr, &timeout);
                if (selectResult == -1)
                {
                    perror("select");
                    return;
                }
                else if (selectResult == 0)
                {
                    // 沒有就緒套接字,繼續(xù)事件循環(huán)
                    continue;
                }

                if (FD_ISSET(mainSocket, &readFds))
                {
                    // 有新的連接請求
                    struct sockaddr_in clientAddress;
                    socklen_t clientAddressLength = sizeof(clientAddress);
                    int clientSocket = accept(mainSocket, (struct sockaddr *)&clientAddress, &clientAddressLength);

                    if (clientSocket == -1)
                    {
                        if (errno == EWOULDBLOCK)
                        {
                            // 沒有新連接,繼續(xù)事件循環(huán)
                            continue;
                        }
                        else
                        {
                            perror("accept");
                            break;
                        }
                    }
                    else
                    {
                        std::cout << "Accepted new connection." << std::endl;

                        // 將客戶端套接字添加到客戶端套接字數(shù)組

                        Register(clientSocket);
                        std::cout << "Register1 " << std::endl;
                    }
                }

                // for (int i = 0; i < MAX_CLIENTS; ++i)
                // {
                //     if (FD_ISSET(clientSockets[i], &readFds))
                //     {
                //         readySockets.push_back(clientSockets[i]);
                //     }
                // }
            }

            for (int socket : clientSockets_)
            {
                if (FD_ISSET(socket, &readFds))
                {
                    threadPool_.Enqueue([this, socket]()
                                        {
                    EventData eventData;
                    eventData.clientSocket = socket;
                    ProcessEvent(eventData); });
                }
            }
        }
    }

private:
    void ProcessEvent(EventData eventData)
    {
        // 處理事件,這里只是簡單示例,回傳客戶端的數(shù)據(jù)
        char buffer[BUFFER_SIZE];
        memset(buffer, 0, sizeof(BUFFER_SIZE));
        ssize_t bytesRead = recv(eventData.clientSocket, buffer, BUFFER_SIZE, 0);
        if (bytesRead > 0)
        {
            send(eventData.clientSocket, buffer, bytesRead, 0);
        }
        else if (bytesRead == 0 || (bytesRead == -1 && errno != EWOULDBLOCK))
        {
            // 連接關閉或出錯,移除客戶端
            Remove(eventData.clientSocket);
        }
        // 通知事件已處理
        std::cout << "Processed socket " << eventData.clientSocket << "  event data " << buffer << "in Thread: " << std::this_thread::get_id() << std::endl;
    }

private:
    ThreadPool &threadPool_;
    std::mutex mutex_;
    std::vector<int> clientSockets_;
    fd_set readFds;
};

class Server
{
public:
    Server(ThreadPool &threadPool) : reactor_(threadPool)
    {
    }

    bool Init(int port)
    {
        mainSocket_ = socket(AF_INET, SOCK_STREAM, 0);
        if (mainSocket_ == -1)
        {
            std::cerr << "creat socket err" << std::endl;
            return false;
        }

        int opt = 1;
        if (setsockopt(mainSocket_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1)
        {
            std::cerr << "setsockopt err" << std::endl;
            close(mainSocket_);
            return false;
        }

        int flags = fcntl(mainSocket_, F_GETFL, 0);
        if (flags == -1)
        {
            std::cerr << "Error getting socket flags." << std::endl;
            close(mainSocket_);
            return false;
        }
        if (fcntl(mainSocket_, F_SETFL, flags | O_NONBLOCK) == -1)
        {
            std::cerr << "Error setting socket to non-blocking mode." << std::endl;
            close(mainSocket_);
            return false;
        }

        struct sockaddr_in serverAddress;
        serverAddress.sin_family = AF_INET;
        serverAddress.sin_addr.s_addr = INADDR_ANY;
        serverAddress.sin_port = htons(port);

        if (bind(mainSocket_, (struct sockaddr *)&serverAddress, sizeof(serverAddress)) == -1)
        {
            perror("bind");
            return false;
        }

        if (listen(mainSocket_, MAX_CLIENTS) == -1)
        {
            perror("listen");
            return false;
        }

        port_ = port;

        std::cout << "server init ok, listening on port: " << port_ << "。" << std::endl;

        return true;
    }

    void Run()
    {
        std::thread reactorThread([&]()
                                  { reactor_.Run(mainSocket_); });

        // 等待Reactor線程結束
        reactorThread.join();
    }

private:
    int mainSocket_;
    int port_;
    Reactor reactor_;
};

int main(int argc, char *argv[])
{
    if (argc < 2)
    {
        std::cerr << "please input the port of server。" << std::endl;
        return -1;
    }

    int port = atoi(argv[1]);
    ThreadPool threadPool(MAX_THREADS); // 創(chuàng)建線程池
    Server server(threadPool);          // 將線程池傳遞給服務器構造函數(shù)

    if (!server.Init(port))
    {
        std::cerr << "int server failed :" << port << std::endl;
        return 1;
    }

    server.Run();

    return 0;
}

  • 調試: Linux下nc命令作為客戶端:
    nc 127.0.0.1 7777

文章來源地址http://www.zghlxwxcb.cn/news/detail-694081.html

到了這里,關于服務器IO復用reactor模式的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯(lián)網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Linux學習記錄——??? 高級IO(5)--- Epoll型服務器(2)(Reactor)

    本篇基于上篇代碼繼續(xù)改進,很長。關于Reactor的說明在后一篇 上面的代碼在處理讀事件時,用的request數(shù)組是臨時的,如果有數(shù)據(jù)沒讀完,那么下次再來到這里,就沒有這些數(shù)據(jù)了。所以得讓每一個fd都有自己的緩沖區(qū)。建立一個Connection類,然后有一個map結構,讓這個類和每

    2024年01月20日
    瀏覽(20)
  • 【高并發(fā)服務器 02】——線程池與IO多路復用

    線程池的好處 :所有的池都是為了事先把資源準備好,在后續(xù)用的時候可以更加方便的拿到這個資源—— 不用去申請、釋放資源 什么時候用線程池 ? IO事務并發(fā)較高 :人在杭州,但是數(shù)據(jù)庫在北京,想要查詢數(shù)據(jù)庫,需要通過互聯(lián)網建立TCP三次握手,頻繁地創(chuàng)建和銷毀線

    2024年03月23日
    瀏覽(24)
  • TCP服務器的演變過程:IO多路復用機制select實現(xiàn)TCP服務器

    TCP服務器的演變過程:IO多路復用機制select實現(xiàn)TCP服務器

    手把手教你從0開始編寫TCP服務器程序,體驗開局一塊磚,大廈全靠壘。 為了避免篇幅過長使讀者感到乏味,對【TCP服務器的開發(fā)】進行分階段實現(xiàn),一步步進行優(yōu)化升級。 本節(jié),在上一章節(jié)的基礎上,將并發(fā)的實現(xiàn)改為IO多路復用機制,使用select管理每個新接入的客戶端連

    2024年02月03日
    瀏覽(15)
  • 【TCP服務器的演變過程】使用IO多路復用器epoll實現(xiàn)TCP服務器

    【TCP服務器的演變過程】使用IO多路復用器epoll實現(xiàn)TCP服務器

    手把手教你從0開始編寫TCP服務器程序,體驗開局一塊磚,大廈全靠壘。 為了避免篇幅過長使讀者感到乏味,對【TCP服務器的開發(fā)】進行分階段實現(xiàn),一步步進行優(yōu)化升級。 本節(jié),在上一章節(jié)的基礎上,將IO多路復用機制select改為更高效的IO多路復用機制epoll,使用epoll管理每

    2024年01月17日
    瀏覽(16)
  • IO多路復用中select的TCP服務器模型和poll服務模型

    服務器端 客戶端 poll客戶端

    2024年02月12日
    瀏覽(29)
  • Linux多路IO復用技術——epoll詳解與一對多服務器實現(xiàn)

    Linux多路IO復用技術——epoll詳解與一對多服務器實現(xiàn)

    本文詳細介紹了Linux中epoll模型的優(yōu)化原理和使用方法,以及如何利用epoll模型實現(xiàn)簡易的一對多服務器。通過對epoll模型的優(yōu)化和相關接口的解釋,幫助讀者理解epoll模型的工作原理和優(yōu)缺點,同時附帶代碼實現(xiàn)和圖解說明。

    2024年02月05日
    瀏覽(25)
  • 架構篇19:單服務器高性能模式-Reactor與Proactor

    架構篇19:單服務器高性能模式-Reactor與Proactor

    上篇介紹了單服務器高性能的 PPC 和 TPC 模式,它們的優(yōu)點是實現(xiàn)簡單,缺點是都無法支撐高并發(fā)的場景,尤其是互聯(lián)網發(fā)展到現(xiàn)在,各種海量用戶業(yè)務的出現(xiàn),PPC 和 TPC 完全無能為力。今天我將介紹可以應對高并發(fā)場景的單服務器高性能架構模式:Reactor 和 Proactor。 PPC 模式

    2024年01月25日
    瀏覽(19)
  • 使用IO多路復用select完成TCP循環(huán)服務器接收客戶端消息并打印

    使用IO多路復用select完成TCP循環(huán)服務器接收客戶端消息并打印

    服務器 ? ? ? 客戶端 ? ? 結果 ? ?

    2024年02月12日
    瀏覽(26)
  • 【網絡進階】服務器模型Reactor與Proactor

    【網絡進階】服務器模型Reactor與Proactor

    在高并發(fā)編程和網絡連接的消息處理中,通常可分為兩個階段:等待消息就緒和消息處理。當使用默認的阻塞套接字時(例如每個線程專門處理一個連接),這兩個階段往往是合并的。因此,處理套接字的線程需要等待消息就緒,這在高并發(fā)場景下導致線程頻繁地休眠和喚醒

    2024年02月01日
    瀏覽(15)
  • 【Linux】高級IO --- Reactor網絡IO設計模式

    【Linux】高級IO --- Reactor網絡IO設計模式

    人其實很難抵制誘惑,人只能遠離誘惑,所以千萬不要高看自己的定力。 1. 多路轉接接口select poll epoll所做的工作其實都是事件通知,只向上層通知事件到來,處理就緒事件的工作并不由這些API來完成,這些接口在進行事件通知時,有沒有自己的策略呢? 其實是有的,在網絡

    2024年02月09日
    瀏覽(27)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包