国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

TCP服務(wù)器的演變過程:使用epoll構(gòu)建reactor網(wǎng)絡(luò)模型實現(xiàn)百萬級并發(fā)(詳細(xì)代碼)

這篇具有很好參考價值的文章主要介紹了TCP服務(wù)器的演變過程:使用epoll構(gòu)建reactor網(wǎng)絡(luò)模型實現(xiàn)百萬級并發(fā)(詳細(xì)代碼)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、前言

手把手教你從0開始編寫TCP服務(wù)器程序,體驗開局一塊磚,大廈全靠壘。

為了避免篇幅過長使讀者感到乏味,對【TCP服務(wù)器的開發(fā)】進行分階段實現(xiàn),一步步進行優(yōu)化升級。

本節(jié),在上一章節(jié)介紹了如何使用epoll開發(fā)高效的服務(wù)器,本節(jié)將介紹使用epoll構(gòu)建reactor網(wǎng)絡(luò)模型,實現(xiàn)異步事件處理。

網(wǎng)絡(luò)并發(fā),通俗的講就是服務(wù)器可以承載的客戶端數(shù)量,即服務(wù)器可以穩(wěn)定保證客戶端同時接入的數(shù)量。

二、reactor簡介

Reactor模型開發(fā)效率比直接使用IO多路復(fù)用要高,它一般是單線程的,設(shè)計目標(biāo)是希望一個線程使用CPU的全部資源;帶來的優(yōu)點是,在每個事件處理中很多時候不需要考慮共享資源的互斥訪問。

Reactor模式是處理并發(fā)IO比較常見的模式,用于同步IO,核心思想是將所有要處理的IO事件注冊到一個中心IO多路復(fù)用器上,同時主線程或進程阻塞在IO多路復(fù)用器上;一旦有事件到來或準(zhǔn)備就緒,多路復(fù)用器返回并將事先注冊的相應(yīng) I/O 事件分發(fā)到對應(yīng)的處理器中。

Reactor的優(yōu)點:

  1. 響應(yīng)快;不必為單個同步事件阻塞,雖然Reactor本身依然是同步的。

  2. 編程相對簡單;可以最大程度的避免復(fù)雜的多線程及同步問題,盡可能的避免多線程、多進程的切換開銷。

  3. 可擴展性;可通過增加Reactor實例個數(shù),充分利用CPU資源。

  4. 高復(fù)用性;Reactor模型本身與事件處理邏輯無關(guān),具有很高的復(fù)用性。

三、實現(xiàn)步驟

3.1、step 1:定義Reactor模型相關(guān)結(jié)構(gòu)體

reactor數(shù)據(jù)結(jié)構(gòu)設(shè)計圖如下:
TCP服務(wù)器的演變過程:使用epoll構(gòu)建reactor網(wǎng)絡(luò)模型實現(xiàn)百萬級并發(fā)(詳細(xì)代碼),Linux網(wǎng)絡(luò)設(shè)計,網(wǎng)絡(luò),tcp/ip,服務(wù)器,開發(fā)語言,網(wǎng)絡(luò)協(xié)議,c語言,linux
結(jié)構(gòu)說明:以fd作為索引,存放在block中;當(dāng)一個fd到來時,通過fd/MAX先找到fd對應(yīng)的block號,再通過fd%MAX找到對應(yīng)的偏移地址。例如來了個fd=10000,每個塊存放的最大item數(shù)量MAX=1024,那么fd對應(yīng)的block序號等于10000/1024=9;偏移量等于10000%1024=784。這樣就可以找到fd對應(yīng)的數(shù)據(jù)存放地址item。

數(shù)據(jù)結(jié)構(gòu)的代碼實現(xiàn)如下:

struct socket_item
{
    /* data */
    int     fd;                 // socket的文件描述符
    char    *write_buffer;      // 寫緩沖區(qū)
    char    *read_buffer;       // 讀緩沖區(qū)
    int     write_length;       // 已讀字節(jié)數(shù)
    int     read_length;        // 已寫字節(jié)數(shù)

    int     status;             // 狀態(tài)標(biāo)識,設(shè)置epfd的操作模式

    int     event;              // 事件類型
    void    *arg;               // 回調(diào)函數(shù)的參數(shù)
    int(*callback)(int fd,int events,void* arg);    // 回調(diào)函數(shù)
};


struct event_block
{
    /* data */
    struct socket_item  *items;     // 事件集合
    struct event_block  *next;      // 指向像一個內(nèi)存塊
};

struct net_reactor
{
    /* data */
    int epollfd;                        // 事件塊的數(shù)量
    int block_count;                    // 事件塊的數(shù)量
    int finish_reactor;                 // 判斷是否退出服務(wù)
    struct event_block  *start_block;   // 事件塊的起始地址
};

3.2、step 2:實現(xiàn)Reactor容器初始化功能

我們這里使用epoll作為IO多路復(fù)用器。
思路:初始化reactor內(nèi)存塊,避免臟數(shù)據(jù);創(chuàng)建events和block并初始化,將events添加到block中,將block添加到reactor的鏈表中管理。

// 2.
int init_reactor(struct net_reactor *reactor)
{
    if(reactor==NULL)
        return REACTOR_NULL;
    memset(reactor,0,sizeof(struct net_reactor));

    // 創(chuàng)建epoll,作為IO多路復(fù)用器
    reactor->epollfd=epoll_create(1);
    if(reactor->epollfd==-1){
        printf("create epfd in %s error %s\n", __func__, strerror(errno));
        return REACTOR_CREATE_EPOLL_FAILED;
    }

    // 創(chuàng)建事件集
    struct socket_item *items=(struct socket_item *)malloc(MAX_SOCKET_ITEMS*sizeof(struct socket_item));
    if(items==NULL)
    {
        printf("create socket_item in %s error %s\n", __func__, strerror(errno));
        close(reactor->epollfd);
        return REACTOR_MALLOC_MEMORY_FAILED;
    }
    memset(items,0,MAX_SOCKET_ITEMS*sizeof(struct socket_item));

    // 創(chuàng)建事件內(nèi)存塊
    struct event_block *block=(struct event_block *)malloc(sizeof(struct event_block));
    if(block==NULL)
    {
        printf("create block in %s error %s\n", __func__, strerror(errno));
        free(items);
        close(reactor->epollfd);
        return REACTOR_MALLOC_MEMORY_FAILED;
    }
    memset(block,0,sizeof(struct event_block));

    block->items=items;
    block->next=NULL;

    reactor->block_count=1;
    reactor->start_block=block;
    reactor->finish_reactor=0;


    return REACTOR_SUCCESS;
}

3.3、step 3:實現(xiàn)socket初始化功能

定義成一個函數(shù),方便初始化多個監(jiān)聽端口。

// 3.
int init_socket(short port)
{
    int fd=socket(AF_INET,SOCK_STREAM,0);
    if(fd==-1)
    {
        printf("create socket in %s error %s\n", __func__, strerror(errno));
		return -1;
    }

    int ret;
    // nonblock
	int flag = fcntl(fd, F_GETFL, 0);
	flag |= O_NONBLOCK;
	ret=fcntl(fd, F_SETFL, flag);
    if (ret == -1)
	{
		printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno));
        close(fd);
		return -1;
	}

    // 綁定
    struct sockaddr_in server;
    memset(&server, 0, sizeof(server));
    server.sin_addr.s_addr=htonl(INADDR_ANY);
    server.sin_family=AF_INET;
    server.sin_port=htons(port);
    ret=bind(fd,(struct sockaddr*)&server,sizeof(server));
    if(ret==-1)
    {
        printf("bind() in %s error %s\n", __func__, strerror(errno));
        close(fd);
		return -1;
    }

    // 監(jiān)聽
    ret=listen(fd,LISTEN_BLK_SIZE);
    if(ret==-1)
    {
        printf("listen failed : %s\n", strerror(errno));
        close(fd);
		return -1;
    }
    printf("listen server port : %d\n", port);

    return fd;
}

3.4、step 4:實現(xiàn)Reactor動態(tài)擴容功能

為了實現(xiàn)高并發(fā),服務(wù)器需要監(jiān)聽多個端口。當(dāng)高并發(fā)時需要reactor容器進行擴容管理。

核心思路:找到鏈表的末端,分別為events和block分配內(nèi)存并初始化,將events添加到block中,將block添加到reactor的鏈表中管理。

// 4. 實現(xiàn)Reactor動態(tài)擴容功能
static int reactor_resize(struct net_reactor *reactor)
{
    if(reactor==NULL)
        return REACTOR_NULL;
    if(reactor->start_block==NULL)
        return REACTOR_NULL;
    
    // 找到鏈表末端
    struct event_block *cur_block=reactor->start_block;
    while(cur_block->next!=NULL)
    {
        cur_block=cur_block->next;
    }

    // 創(chuàng)建事件集
    struct socket_item *items=(struct socket_item *)malloc(MAX_SOCKET_ITEMS*sizeof(struct socket_item));
    if(items==NULL)
    {
        printf("create socket_item in %s error %s\n", __func__, strerror(errno));
        return REACTOR_MALLOC_MEMORY_FAILED;
    }
    memset(items,0,MAX_SOCKET_ITEMS*sizeof(struct socket_item));

    // 創(chuàng)建事件內(nèi)存塊
    struct event_block *block=(struct event_block *)malloc(sizeof(struct event_block));
    if(block==NULL)
    {
        printf("create block in %s error %s\n", __func__, strerror(errno));
        free(items);
        return REACTOR_MALLOC_MEMORY_FAILED;
    }
    memset(block,0,sizeof(struct event_block));

    block->next=NULL;
    block->items=items;

    cur_block->next=block;
    
    reactor->block_count++;

    return REACTOR_SUCCESS;
}

3.5、step 5:實現(xiàn)Reactor索引功能

思路:通過fd/MAX先找到fd對應(yīng)的block號,再通過fd%MAX找到對應(yīng)的偏移地址。

例如來了個fd=10000,每個塊存放的最大item數(shù)量MAX=1024,那么fd對應(yīng)的block序號等于10000/1024=9;偏移量等于10000%1024=784。這樣就可以找到fd對應(yīng)的數(shù)據(jù)存放地址item。

// 5. 實現(xiàn)Reactor索引功能
static struct socket_item *reactor_index(struct net_reactor *reactor,int socketfd)
{
    if(reactor==NULL)
        return NULL;
    if(reactor->start_block==NULL)
        return NULL;
    
    // fd所在block序號
    int block_id=socketfd/MAX_SOCKET_ITEMS;

    // block序號不存在時自動擴容
    while(block_id>=reactor->block_count)
    {
        if(reactor_resize(reactor)<0)
        {
            printf("reactor_resize in %s error %s\n", __func__, strerror(errno));
            return NULL;
        }
    }

    // 找到fd對應(yīng)block的位置
    struct event_block *cur_block=reactor->start_block;
    int i=0;
    while(i++ !=block_id && cur_block!=NULL)
    {
        cur_block=cur_block->next;
    }

    return &cur_block->items[socketfd%MAX_SOCKET_ITEMS];
}

3.6、step 6:實現(xiàn)設(shè)置事件信息功能

將事件的相關(guān)信息保存到數(shù)據(jù)結(jié)構(gòu)中。主要實現(xiàn)填充關(guān)鍵信息到event結(jié)構(gòu)體中。

// 6. 實現(xiàn)設(shè)置事件信息功能
static void reactor_event_set(int fd,struct socket_item *sockevent,NCALLBACK callback,void *arg)
{
    sockevent->arg=arg;
    sockevent->callback=callback;
    sockevent->event=0;
    sockevent->fd=fd;
}

3.7、step 7:實現(xiàn)IO事件監(jiān)聽功能

這里使用epoll作為IO多路復(fù)用器,將事件添加到epoll中監(jiān)聽。
思路:主要是epoll_ctl操作,將事件添加到reactor的event結(jié)構(gòu)體中。

// 7. 實現(xiàn)設(shè)置IO事件監(jiān)聽功能
static int reactor_event_add(int epollfd,int events,struct socket_item *sockevent)
{
    struct epoll_event ep_events={0,{0}};
    ep_events.data.ptr=sockevent;
    ep_events.events=events;
    sockevent->event=events;

    // 判斷,設(shè)置epfd的操作模式
    int options;
    if(sockevent->status==1)
    {
        options=EPOLL_CTL_MOD;
    }
    else{
        options=EPOLL_CTL_ADD;
        sockevent->status=1;
    }

    if(epoll_ctl(epollfd,options,sockevent->fd,&ep_events)<0)
    {
        printf("event add failed [fd=%d], events[%d]\n", sockevent->fd, events);
		printf("event add failed in %s error %s\n", __func__, strerror(errno));
		return -1;
    }
    return 0;
}

3.8、step 8:實現(xiàn)IO事件移除功能

由于設(shè)置了非阻塞模式,當(dāng)事件到來時,需要暫時移除監(jiān)聽,避免干擾。

// 8. 實現(xiàn)IO事件移除功能
static int reactor_event_del(int epollfd,struct socket_item *sockevent)
{
    if(sockevent->status!=1)
        return -1;
    struct epoll_event ep_events={0,{0}};
    ep_events.data.ptr=sockevent;
    
    sockevent->status=0;
    // 移除fd的監(jiān)聽
	if(epoll_ctl(epollfd, EPOLL_CTL_DEL,sockevent->fd, &ep_events)<0)
    {
        printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
		return -1;
    }

    return 0;
}

3.9、step 9:實現(xiàn)Reactor事件監(jiān)聽功能

思路:設(shè)置fd的事件信息,添加事件到epoll監(jiān)聽。

// 9. 實現(xiàn)Reactor事件監(jiān)聽功能
int reactor_add_listener(struct net_reactor *reactor,int sockfd,NCALLBACK *acceptor)
{
    if(reactor==NULL)
        return REACTOR_NULL;
    if(reactor->start_block==NULL)
        return REACTOR_NULL;
    
    // 找到fd對應(yīng)的event地址
    struct socket_item *item=reactor_index(reactor,sockfd);
    if(item==NULL)
    {
        printf("reactor_index failed in %s error %s\n", __func__, strerror(errno));
        return REACTOR_ADD_LISTEN_FAILED;
    }

    reactor_event_set(sockfd,item,acceptor,reactor);

    if(reactor_event_add(reactor->epollfd,EPOLLIN,item)<0)
    {
        return REACTOR_ADD_LISTEN_FAILED;
    }
    printf("add listen fd = %d\n",sockfd);
    return REACTOR_SUCCESS;
}

3.10、step 10:實現(xiàn)recv回調(diào)函數(shù)

思路:找到fd對應(yīng)的信息內(nèi)存塊;使用recv接收數(shù)據(jù);暫時移除該事件的監(jiān)聽;如果接收成功,設(shè)置監(jiān)聽事件為是否可寫,添加到IO多路復(fù)用器(epoll)中;返回收到的數(shù)據(jù)長度。

// 10:實現(xiàn)recv回調(diào)函數(shù)
static int callback_recv(int fd, int events, void *arg)
{
    struct net_reactor *reactor=(struct net_reactor *)arg;
    if(reactor==NULL)
        return REACTOR_NULL;
    
    // 找到fd對應(yīng)的event地址
    struct socket_item *item=reactor_index(reactor,fd);
    if(item==NULL)
    {
        printf("callback_recv in %s error %s\n", __func__, strerror(errno));
        return REACTOR_MALLOC_MEMORY_FAILED;
    }

    // 接收數(shù)據(jù)
    int ret= recv(fd,item->read_buffer,BUFFER_LENGTH,0);

    // 暫時移除監(jiān)聽
	if(reactor_event_del(reactor->epollfd, item)<0)
    {
        printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
		//return REACTOR_EVENT_DEL_FAILED;
    }
    if(ret>0)
    {
        item->read_length+=ret;
        printf("recv [%d]:%s\n", fd, item->read_buffer);

        // demo
        memcpy(item->write_buffer,item->read_buffer,ret);
        item->write_buffer[ret]='\0';
        item->write_length=ret;

        reactor_event_set(fd,item,callback_send,reactor);
        if(reactor_event_add(reactor->epollfd,EPOLLOUT,item)<0)
        {
            printf("reactor_event_add failed in %s error %s\n", __func__, strerror(errno));
            //return REACTOR_ADD_LISTEN_FAILED;
        }

    }
    else if(ret==0)
    {
		printf("recv_cb --> disconnect\n");
        free(item->read_buffer);
        free(item->write_buffer);
		close(item->fd);
    }
    else
    {
        if(errno==EAGAIN || errno==EWOULDBLOCK)
        {
            // 表示沒有數(shù)據(jù)可讀。這時可以繼續(xù)等待數(shù)據(jù)到來,或者關(guān)閉套接字。
        }
        else if (errno == ECONNRESET) {
			// reactor_event_del(reactor->epollfd, item);
            free(item->read_buffer);
            free(item->write_buffer);
			close(item->fd);
		}
		printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
    }
    return ret;
}

3.11、step 11:實現(xiàn)send回調(diào)函數(shù)

思路:找到fd對應(yīng)的信息內(nèi)存塊;使用send發(fā)送數(shù)據(jù);暫時移除該事件的監(jiān)聽;如果發(fā)送成功,設(shè)置監(jiān)聽事件為是否可讀,添加到IO多路復(fù)用器(epoll)中;返回發(fā)送的數(shù)據(jù)長度。

// 11:實現(xiàn)send回調(diào)函數(shù)
static int callback_send(int fd, int events, void *arg)
{
    struct net_reactor *reactor=(struct net_reactor *)arg;
    if(reactor==NULL)
    {
        return REACTOR_NULL;
    }

    // 找到fd對應(yīng)的event地址
    struct socket_item *item=reactor_index(reactor,fd);
    if(item==NULL)
    {
        printf("callback_recv in %s error %s\n", __func__, strerror(errno));
        return REACTOR_MALLOC_MEMORY_FAILED;
    }

    int ret=send(fd,item->write_buffer,item->write_length,0);
    // 暫時移除監(jiān)聽
	if(reactor_event_del(reactor->epollfd, item)<0)
    {
        printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
		//return REACTOR_EVENT_DEL_FAILED;
    }
    if (ret > 0)
	{
		
		printf("send[fd=%d], [%d]%s\n", fd, ret, item->write_buffer);
		reactor_event_set(fd, item, callback_recv, reactor);
		reactor_event_add(reactor->epollfd, EPOLLIN, item);
	}
	else
	{
        free(item->read_buffer);
        free(item->write_buffer);
		close(fd);
		printf("send[fd=%d] error %s\n", fd, strerror(errno));
	}
    
    return ret;
}

3.12、step 12:實現(xiàn)accept回調(diào)函數(shù)

思路:使用accept獲得連接的客戶端fd;設(shè)置客戶端fd為非阻塞模式;找到fd對應(yīng)的信息內(nèi)存塊;設(shè)置fd的事件信息;設(shè)置監(jiān)聽事件為是否可讀,添加到IO多路復(fù)用器(epoll)中。

// 12. 實現(xiàn)accept回調(diào)函數(shù)
int callback_accept(int fd, int events, void *arg)
{
    struct net_reactor *reactor=(struct net_reactor *)arg;
    if(reactor==NULL)
    {
        return REACTOR_NULL;
    }

    struct sockaddr_in client;
    socklen_t len=sizeof(client);
    int connectfd=accept(fd,(struct sockaddr*)&client,&len);
    if(connectfd<0)
    {
        printf("accept failed in %s error %s\n", __func__, strerror(errno));
        return REACTOR_ACCEPT_FAILED;
    }

    // 設(shè)置非阻塞
    int flag = fcntl(connectfd, F_GETFL, 0);
	flag |= O_NONBLOCK;
	int ret=fcntl(connectfd, F_SETFL, flag);
    if (ret == -1)
	{
		printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno));
        close(connectfd);
		return REACTOR_FCNTL_FAILED;
	}

    // 找到fd對應(yīng)的event地址
    struct socket_item *item=reactor_index(reactor,connectfd);
    if(item==NULL)
    {
        printf("reactor_index in %s error %s\n", __func__, strerror(errno));
        close(connectfd);
        return REACTOR_MALLOC_MEMORY_FAILED;
    }
    
    // 設(shè)置fd的事件信息
    reactor_event_set(connectfd,item,callback_recv,reactor);

    // 添加事件到epoll監(jiān)聽
    if(reactor_event_add(reactor->epollfd,EPOLLIN,item)<0)
    {
        close(connectfd);
        return REACTOR_ADD_LISTEN_FAILED;
    }

    // 為fd分配好讀寫緩沖區(qū)
    item->read_buffer=(char *)malloc(BUFFER_LENGTH * sizeof(char));
    if(item->read_buffer==NULL)
    {
        printf("mallc in %s error %s\n", __func__, strerror(errno));
        close(connectfd);
        return REACTOR_MALLOC_MEMORY_FAILED;
    }
    memset(item->read_buffer,0,BUFFER_LENGTH * sizeof(char));
    item->read_length=0;
    item->write_buffer=(char *)malloc(BUFFER_LENGTH * sizeof(char));
    if(item->write_buffer==NULL)
    {
        printf("mallc in %s error %s\n", __func__, strerror(errno));
        close(connectfd);
        free(item->read_buffer);
        return REACTOR_MALLOC_MEMORY_FAILED;
    }
    memset(item->write_buffer,0,BUFFER_LENGTH * sizeof(char));
    item->write_length=0;

    printf("new connect [%s:%d], pos[%d]\n",inet_ntoa(client.sin_addr), ntohs(client.sin_port), connectfd);

	return REACTOR_SUCCESS;
}

3.13、step 13:實現(xiàn)reactor運行函數(shù)

主要是epoll的等待功能,將監(jiān)聽到的事件進行回調(diào)處理。

// 13:實現(xiàn)reactor運行函數(shù)
int reactor_run(struct net_reactor *reactor)
{
    if(reactor==NULL)
        return REACTOR_NULL;
    if(reactor->start_block==NULL)
        return REACTOR_NULL;
    if(reactor->epollfd<0)
        return REACTOR_CREATE_EPOLL_FAILED;

    struct epoll_event events[MAX_EPOLL_EVENTS + 1];
    
    while(!reactor->finish_reactor)
    {
        int nready=epoll_wait(reactor->epollfd,events,MAX_EPOLL_EVENTS,1000);
        if(nready<0)
        {
            printf("epoll wait error\n");
			continue;
        }
        int i=0;
        for(i=0;i<nready;i++)
        {
            struct socket_item *item=(struct socket_item *)events[i].data.ptr;
            if((events[i].events & EPOLLIN) && (item->event&EPOLLIN))
            {
                // 處理可讀事件
				item->callback(item->fd, events[i].events, item->arg);
            }
            if((events[i].events & EPOLLOUT) && (item->event&EPOLLOUT))
            {
                // 處理可讀事件
				item->callback(item->fd, events[i].events, item->arg);
            }
        }

    }
    printf("Clearing memory......\n");
    reactor_destory(reactor);

    printf("finish reactor\n");
    return REACTOR_SUCCESS;
}

3.14、step 14:實現(xiàn)reactor銷毀功能

// 14:實現(xiàn)reactor銷毀功能
int reactor_destory(struct net_reactor *reactor)
{
	// 關(guān)閉epoll
	close(reactor->epollfd);

    
	struct event_block *blk= reactor->start_block;
	struct event_block *next;

	while (blk != NULL)
	{
		next = blk->next;
		// 釋放內(nèi)存塊
        for(int i=0;i<MAX_SOCKET_ITEMS;i++)
        {
            if(blk->items[i].read_buffer!=NULL)
            {
                free(blk->items[i].read_buffer);
                blk->items[i].read_buffer=NULL;
            }
            if(blk->items[i].write_buffer!=NULL)
            {
                free(blk->items[i].write_buffer);
                blk->items[i].write_buffer=NULL;
            }
        }
		free(blk->items);
		free(blk);
		blk = next;
	}
	return REACTOR_SUCCESS;
}

3.15、使用示例

  1. 創(chuàng)建structnet_reactor對象。
  2. 調(diào)用init_reactor初始化。
  3. 調(diào)用init_socket監(jiān)聽端口。
  4. 調(diào)用reactor_add_listener將端口添加到reactor中管理。
  5. 調(diào)用reactor_run運行reactor服務(wù)。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include "reactor.h"

#define SERVER_PORT     9703
#define PORT_COUNT      2

int main(int argc,char **argv)
{
    struct net_reactor *reactor=(struct net_reactor *)malloc(sizeof(struct net_reactor));
    if(reactor==NULL)
    {
        perror("malloc struct net_reactor failed!\n");
        return REACTOR_MALLOC_MEMORY_FAILED;
    }

    if(init_reactor(reactor)<0)
    {
        free(reactor);
        return REACTOR_NULL;
    }

    unsigned short port = SERVER_PORT;
    int sockfds[PORT_COUNT]={0};
    int i=0;
    for(i=0;i<PORT_COUNT;i++)
    {
        sockfds[i]=init_socket(port+i);
        if(sockfds[i]>0)
        {
            if(reactor_add_listener(reactor,sockfds[i],callback_accept)<0)
                printf("reactor_add_listener failed in %s : %d\n",__func__,__LINE__);
        }
        else
        {
            printf("init_socket failed in %s : %d\n",__func__,__LINE__);
        }
        
    }

    reactor_run(reactor);

    // 銷毀 reactor
	//reactor_destory(reactor);
    reactor->finish_reactor=1;

    // 關(guān)閉socket集
    for(i=0;i<PORT_COUNT;i++)
    {
        close(sockfds[i]);
    }

    // 釋放reactor
	free(reactor);

    return 0;

}

編譯:

gcc -o server main.c reactor.c

四、完整代碼

reactor.h

#ifndef _REACTOR_H_
#define _REACTOR_H_

enum REACTOR_ERROR_CODE{
    REACTOR_EVENT_DEL_FAILED=-7,
    REACTOR_FCNTL_FAILED=-6,
    REACTOR_ACCEPT_FAILED=-5,
    REACTOR_ADD_LISTEN_FAILED=-4,
    REACTOR_MALLOC_MEMORY_FAILED=-3,
    REACTOR_CREATE_EPOLL_FAILED=-2,
    REACTOR_NULL=-1,
    REACTOR_SUCCESS=0
};

struct socket_item
{
    /* data */
    int     fd;                 // socket的文件描述符
    char    *write_buffer;      // 寫緩沖區(qū)
    char    *read_buffer;       // 讀緩沖區(qū)
    int     write_length;       // 已讀字節(jié)數(shù)
    int     read_length;        // 已寫字節(jié)數(shù)

    int     status;             // 狀態(tài)標(biāo)識,設(shè)置epfd的操作模式

    int     event;              // 事件類型
    void    *arg;               // 回調(diào)函數(shù)的參數(shù)
    int(*callback)(int fd,int events,void* arg);    // 回調(diào)函數(shù)
};

struct event_block
{
    /* data */
    struct socket_item  *items;     // 事件集合
    struct event_block  *next;      // 指向像一個內(nèi)存塊
};

struct net_reactor
{
    /* data */
    int epollfd;                        // 事件塊的數(shù)量
    int block_count;                    // 事件塊的數(shù)量
    int finish_reactor;                 // 判斷是否退出服務(wù)
    struct event_block  *start_block;   // 事件塊的起始地址
};

typedef int NCALLBACK(int ,int, void*);


int init_reactor(struct net_reactor *reactor);
int init_socket(short port);
int callback_accept(int fd, int events, void *arg);
int reactor_destory(struct net_reactor *reactor);
int reactor_add_listener(struct net_reactor *reactor,int sockfd,NCALLBACK *acceptor);
int reactor_run(struct net_reactor *reactor);

static int callback_send(int fd, int events, void *arg);
static int callback_recv(int fd, int events, void *arg);

#endif

reactor.c

#include <stdlib.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>

#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <fcntl.h>

#include "reactor.h"

#define MAX_SOCKET_ITEMS    1024
#define LISTEN_BLK_SIZE     20
#define BUFFER_LENGTH       1024
#define MAX_EPOLL_EVENTS    1024




// 2.
int init_reactor(struct net_reactor *reactor)
{
    if(reactor==NULL)
        return REACTOR_NULL;
    memset(reactor,0,sizeof(struct net_reactor));

    // 創(chuàng)建epoll,作為IO多路復(fù)用器
    reactor->epollfd=epoll_create(1);
    if(reactor->epollfd==-1){
        printf("create epfd in %s error %s\n", __func__, strerror(errno));
        return REACTOR_CREATE_EPOLL_FAILED;
    }

    // 創(chuàng)建事件集
    struct socket_item *items=(struct socket_item *)malloc(MAX_SOCKET_ITEMS*sizeof(struct socket_item));
    if(items==NULL)
    {
        printf("create socket_item in %s error %s\n", __func__, strerror(errno));
        close(reactor->epollfd);
        return REACTOR_MALLOC_MEMORY_FAILED;
    }
    memset(items,0,MAX_SOCKET_ITEMS*sizeof(struct socket_item));

    // 創(chuàng)建事件內(nèi)存塊
    struct event_block *block=(struct event_block *)malloc(sizeof(struct event_block));
    if(block==NULL)
    {
        printf("create block in %s error %s\n", __func__, strerror(errno));
        free(items);
        close(reactor->epollfd);
        return REACTOR_MALLOC_MEMORY_FAILED;
    }
    memset(block,0,sizeof(struct event_block));

    block->items=items;
    block->next=NULL;

    reactor->block_count=1;
    reactor->start_block=block;
    reactor->finish_reactor=0;


    return REACTOR_SUCCESS;
}

// 3.
int init_socket(short port)
{
    int fd=socket(AF_INET,SOCK_STREAM,0);
    if(fd==-1)
    {
        printf("create socket in %s error %s\n", __func__, strerror(errno));
		return -1;
    }

    int ret;
    // nonblock
	int flag = fcntl(fd, F_GETFL, 0);
	flag |= O_NONBLOCK;
	ret=fcntl(fd, F_SETFL, flag);
    if (ret == -1)
	{
		printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno));
        close(fd);
		return -1;
	}

    // 綁定
    struct sockaddr_in server;
    memset(&server, 0, sizeof(server));
    server.sin_addr.s_addr=htonl(INADDR_ANY);
    server.sin_family=AF_INET;
    server.sin_port=htons(port);
    ret=bind(fd,(struct sockaddr*)&server,sizeof(server));
    if(ret==-1)
    {
        printf("bind() in %s error %s\n", __func__, strerror(errno));
        close(fd);
		return -1;
    }

    // 監(jiān)聽
    ret=listen(fd,LISTEN_BLK_SIZE);
    if(ret==-1)
    {
        printf("listen failed : %s\n", strerror(errno));
        close(fd);
		return -1;
    }
    printf("listen server port : %d\n", port);

    return fd;
}

// 4. 實現(xiàn)Reactor動態(tài)擴容功能
static int reactor_resize(struct net_reactor *reactor)
{
    if(reactor==NULL)
        return REACTOR_NULL;
    if(reactor->start_block==NULL)
        return REACTOR_NULL;
    
    // 找到鏈表末端
    struct event_block *cur_block=reactor->start_block;
    while(cur_block->next!=NULL)
    {
        cur_block=cur_block->next;
    }

    // 創(chuàng)建事件集
    struct socket_item *items=(struct socket_item *)malloc(MAX_SOCKET_ITEMS*sizeof(struct socket_item));
    if(items==NULL)
    {
        printf("create socket_item in %s error %s\n", __func__, strerror(errno));
        return REACTOR_MALLOC_MEMORY_FAILED;
    }
    memset(items,0,MAX_SOCKET_ITEMS*sizeof(struct socket_item));

    // 創(chuàng)建事件內(nèi)存塊
    struct event_block *block=(struct event_block *)malloc(sizeof(struct event_block));
    if(block==NULL)
    {
        printf("create block in %s error %s\n", __func__, strerror(errno));
        free(items);
        return REACTOR_MALLOC_MEMORY_FAILED;
    }
    memset(block,0,sizeof(struct event_block));

    block->next=NULL;
    block->items=items;

    cur_block->next=block;
    
    reactor->block_count++;

    return REACTOR_SUCCESS;
}

// 5. 實現(xiàn)Reactor索引功能
static struct socket_item *reactor_index(struct net_reactor *reactor,int socketfd)
{
    if(reactor==NULL)
        return NULL;
    if(reactor->start_block==NULL)
        return NULL;
    
    // fd所在block序號
    int block_id=socketfd/MAX_SOCKET_ITEMS;

    // block序號不存在時自動擴容
    while(block_id>=reactor->block_count)
    {
        if(reactor_resize(reactor)<0)
        {
            printf("reactor_resize in %s error %s\n", __func__, strerror(errno));
            return NULL;
        }
    }

    // 找到fd對應(yīng)block的位置
    struct event_block *cur_block=reactor->start_block;
    int i=0;
    while(i++ !=block_id && cur_block!=NULL)
    {
        cur_block=cur_block->next;
    }

    return &cur_block->items[socketfd%MAX_SOCKET_ITEMS];
}

// 6. 實現(xiàn)設(shè)置事件信息功能
static void reactor_event_set(int fd,struct socket_item *sockevent,NCALLBACK callback,void *arg)
{
    sockevent->arg=arg;
    sockevent->callback=callback;
    sockevent->event=0;
    sockevent->fd=fd;
}

// 7. 實現(xiàn)設(shè)置IO事件監(jiān)聽功能
static int reactor_event_add(int epollfd,int events,struct socket_item *sockevent)
{
    struct epoll_event ep_events={0,{0}};
    ep_events.data.ptr=sockevent;
    ep_events.events=events;
    sockevent->event=events;

    // 判斷,設(shè)置epfd的操作模式
    int options;
    if(sockevent->status==1)
    {
        options=EPOLL_CTL_MOD;
    }
    else{
        options=EPOLL_CTL_ADD;
        sockevent->status=1;
    }

    if(epoll_ctl(epollfd,options,sockevent->fd,&ep_events)<0)
    {
        printf("event add failed [fd=%d], events[%d]\n", sockevent->fd, events);
		printf("event add failed in %s error %s\n", __func__, strerror(errno));
		return -1;
    }
    return 0;
}

// 8. 實現(xiàn)IO事件移除功能
static int reactor_event_del(int epollfd,struct socket_item *sockevent)
{
    if(sockevent->status!=1)
        return -1;
    struct epoll_event ep_events={0,{0}};
    ep_events.data.ptr=sockevent;
    
    sockevent->status=0;
    // 移除fd的監(jiān)聽
	if(epoll_ctl(epollfd, EPOLL_CTL_DEL,sockevent->fd, &ep_events)<0)
    {
        printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
		return -1;
    }

    return 0;
}

// 9. 實現(xiàn)Reactor事件監(jiān)聽功能
int reactor_add_listener(struct net_reactor *reactor,int sockfd,NCALLBACK *acceptor)
{
    if(reactor==NULL)
        return REACTOR_NULL;
    if(reactor->start_block==NULL)
        return REACTOR_NULL;
    
    // 找到fd對應(yīng)的event地址
    struct socket_item *item=reactor_index(reactor,sockfd);
    if(item==NULL)
    {
        printf("reactor_index failed in %s error %s\n", __func__, strerror(errno));
        return REACTOR_ADD_LISTEN_FAILED;
    }

    reactor_event_set(sockfd,item,acceptor,reactor);

    if(reactor_event_add(reactor->epollfd,EPOLLIN,item)<0)
    {
        return REACTOR_ADD_LISTEN_FAILED;
    }
    printf("add listen fd = %d\n",sockfd);
    return REACTOR_SUCCESS;
}

// 10:實現(xiàn)recv回調(diào)函數(shù)
static int callback_recv(int fd, int events, void *arg)
{
    struct net_reactor *reactor=(struct net_reactor *)arg;
    if(reactor==NULL)
        return REACTOR_NULL;
    
    // 找到fd對應(yīng)的event地址
    struct socket_item *item=reactor_index(reactor,fd);
    if(item==NULL)
    {
        printf("callback_recv in %s error %s\n", __func__, strerror(errno));
        return REACTOR_MALLOC_MEMORY_FAILED;
    }

    // 接收數(shù)據(jù)
    int ret= recv(fd,item->read_buffer,BUFFER_LENGTH,0);

    // 暫時移除監(jiān)聽
	if(reactor_event_del(reactor->epollfd, item)<0)
    {
        printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
		//return REACTOR_EVENT_DEL_FAILED;
    }
    if(ret>0)
    {
        item->read_length+=ret;
        printf("recv [%d]:%s\n", fd, item->read_buffer);

        // demo
        memcpy(item->write_buffer,item->read_buffer,ret);
        item->write_buffer[ret]='\0';
        item->write_length=ret;

        reactor_event_set(fd,item,callback_send,reactor);
        if(reactor_event_add(reactor->epollfd,EPOLLOUT,item)<0)
        {
            printf("reactor_event_add failed in %s error %s\n", __func__, strerror(errno));
            //return REACTOR_ADD_LISTEN_FAILED;
        }

    }
    else if(ret==0)
    {
		printf("recv_cb --> disconnect\n");
        free(item->read_buffer);
        free(item->write_buffer);
		close(item->fd);
    }
    else
    {
        if(errno==EAGAIN || errno==EWOULDBLOCK)
        {
            // 表示沒有數(shù)據(jù)可讀。這時可以繼續(xù)等待數(shù)據(jù)到來,或者關(guān)閉套接字。
        }
        else if (errno == ECONNRESET) {
			// reactor_event_del(reactor->epollfd, item);
            free(item->read_buffer);
            free(item->write_buffer);
			close(item->fd);
		}
		printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
    }
    return ret;
}

// 11:實現(xiàn)send回調(diào)函數(shù)
static int callback_send(int fd, int events, void *arg)
{
    struct net_reactor *reactor=(struct net_reactor *)arg;
    if(reactor==NULL)
    {
        return REACTOR_NULL;
    }

    // 找到fd對應(yīng)的event地址
    struct socket_item *item=reactor_index(reactor,fd);
    if(item==NULL)
    {
        printf("callback_recv in %s error %s\n", __func__, strerror(errno));
        return REACTOR_MALLOC_MEMORY_FAILED;
    }

    int ret=send(fd,item->write_buffer,item->write_length,0);
    // 暫時移除監(jiān)聽
	if(reactor_event_del(reactor->epollfd, item)<0)
    {
        printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
		//return REACTOR_EVENT_DEL_FAILED;
    }
    if (ret > 0)
	{
		
		printf("send[fd=%d], [%d]%s\n", fd, ret, item->write_buffer);
		reactor_event_set(fd, item, callback_recv, reactor);
		reactor_event_add(reactor->epollfd, EPOLLIN, item);
	}
	else
	{
        free(item->read_buffer);
        free(item->write_buffer);
		close(fd);
		printf("send[fd=%d] error %s\n", fd, strerror(errno));
	}
    
    return ret;
}

// 12. 實現(xiàn)accept回調(diào)函數(shù)
int callback_accept(int fd, int events, void *arg)
{
    struct net_reactor *reactor=(struct net_reactor *)arg;
    if(reactor==NULL)
    {
        return REACTOR_NULL;
    }

    struct sockaddr_in client;
    socklen_t len=sizeof(client);
    int connectfd=accept(fd,(struct sockaddr*)&client,&len);
    if(connectfd<0)
    {
        printf("accept failed in %s error %s\n", __func__, strerror(errno));
        return REACTOR_ACCEPT_FAILED;
    }

    // 設(shè)置非阻塞
    int flag = fcntl(connectfd, F_GETFL, 0);
	flag |= O_NONBLOCK;
	int ret=fcntl(connectfd, F_SETFL, flag);
    if (ret == -1)
	{
		printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno));
        close(connectfd);
		return REACTOR_FCNTL_FAILED;
	}

    // 找到fd對應(yīng)的event地址
    struct socket_item *item=reactor_index(reactor,connectfd);
    if(item==NULL)
    {
        printf("reactor_index in %s error %s\n", __func__, strerror(errno));
        close(connectfd);
        return REACTOR_MALLOC_MEMORY_FAILED;
    }
    
    // 設(shè)置fd的事件信息
    reactor_event_set(connectfd,item,callback_recv,reactor);

    // 添加事件到epoll監(jiān)聽
    if(reactor_event_add(reactor->epollfd,EPOLLIN,item)<0)
    {
        close(connectfd);
        return REACTOR_ADD_LISTEN_FAILED;
    }

    // 為fd分配好讀寫緩沖區(qū)
    item->read_buffer=(char *)malloc(BUFFER_LENGTH * sizeof(char));
    if(item->read_buffer==NULL)
    {
        printf("mallc in %s error %s\n", __func__, strerror(errno));
        close(connectfd);
        return REACTOR_MALLOC_MEMORY_FAILED;
    }
    memset(item->read_buffer,0,BUFFER_LENGTH * sizeof(char));
    item->read_length=0;
    item->write_buffer=(char *)malloc(BUFFER_LENGTH * sizeof(char));
    if(item->write_buffer==NULL)
    {
        printf("mallc in %s error %s\n", __func__, strerror(errno));
        close(connectfd);
        free(item->read_buffer);
        return REACTOR_MALLOC_MEMORY_FAILED;
    }
    memset(item->write_buffer,0,BUFFER_LENGTH * sizeof(char));
    item->write_length=0;

    printf("new connect [%s:%d], pos[%d]\n",inet_ntoa(client.sin_addr), ntohs(client.sin_port), connectfd);

	return REACTOR_SUCCESS;
}

// 13:實現(xiàn)reactor運行函數(shù)
int reactor_run(struct net_reactor *reactor)
{
    if(reactor==NULL)
        return REACTOR_NULL;
    if(reactor->start_block==NULL)
        return REACTOR_NULL;
    if(reactor->epollfd<0)
        return REACTOR_CREATE_EPOLL_FAILED;

    struct epoll_event events[MAX_EPOLL_EVENTS + 1];
    
    while(!reactor->finish_reactor)
    {
        int nready=epoll_wait(reactor->epollfd,events,MAX_EPOLL_EVENTS,1000);
        if(nready<0)
        {
            printf("epoll wait error\n");
			continue;
        }
        int i=0;
        for(i=0;i<nready;i++)
        {
            struct socket_item *item=(struct socket_item *)events[i].data.ptr;
            if((events[i].events & EPOLLIN) && (item->event&EPOLLIN))
            {
                // 處理可讀事件
				item->callback(item->fd, events[i].events, item->arg);
            }
            if((events[i].events & EPOLLOUT) && (item->event&EPOLLOUT))
            {
                // 處理可讀事件
				item->callback(item->fd, events[i].events, item->arg);
            }
        }

    }
    printf("Clearing memory......\n");
    reactor_destory(reactor);

    printf("finish reactor\n");
    return REACTOR_SUCCESS;
}

// 14:實現(xiàn)reactor銷毀功能
int reactor_destory(struct net_reactor *reactor)
{
	// 關(guān)閉epoll
	close(reactor->epollfd);

    
	struct event_block *blk= reactor->start_block;
	struct event_block *next;

	while (blk != NULL)
	{
		next = blk->next;
		// 釋放內(nèi)存塊
        for(int i=0;i<MAX_SOCKET_ITEMS;i++)
        {
            if(blk->items[i].read_buffer!=NULL)
            {
                free(blk->items[i].read_buffer);
                blk->items[i].read_buffer=NULL;
            }
            if(blk->items[i].write_buffer!=NULL)
            {
                free(blk->items[i].write_buffer);
                blk->items[i].write_buffer=NULL;
            }
        }
		free(blk->items);
		free(blk);
		blk = next;
	}
	return REACTOR_SUCCESS;
}

main.c

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include "reactor.h"

#define SERVER_PORT     9703
#define PORT_COUNT      2

int main(int argc,char **argv)
{
    struct net_reactor *reactor=(struct net_reactor *)malloc(sizeof(struct net_reactor));
    if(reactor==NULL)
    {
        perror("malloc struct net_reactor failed!\n");
        return REACTOR_MALLOC_MEMORY_FAILED;
    }

    if(init_reactor(reactor)<0)
    {
        free(reactor);
        return REACTOR_NULL;
    }

    unsigned short port = SERVER_PORT;
    int sockfds[PORT_COUNT]={0};
    int i=0;
    for(i=0;i<PORT_COUNT;i++)
    {
        sockfds[i]=init_socket(port+i);
        if(sockfds[i]>0)
        {
            if(reactor_add_listener(reactor,sockfds[i],callback_accept)<0)
                printf("reactor_add_listener failed in %s : %d\n",__func__,__LINE__);
        }
        else
        {
            printf("init_socket failed in %s : %d\n",__func__,__LINE__);
        }
        
    }

    reactor_run(reactor);

    // 銷毀 reactor
	//reactor_destory(reactor);
    reactor->finish_reactor=1;

    // 關(guān)閉socket集
    for(i=0;i<PORT_COUNT;i++)
    {
        close(sockfds[i]);
    }

    // 釋放reactor
	free(reactor);

    return 0;

}

五、百萬級并發(fā)連接測試

自己實現(xiàn)的并發(fā)連接客戶端程序,使用三臺測試平臺分別執(zhí)行測試程序,平均一臺連接30w+。需要注意的是,為了幫助有足夠的fd可以分配,需要使用如下命令修改文件描述符限制:

ulimit -n 1024000

這個限制只在當(dāng)前會話中生效,重新登錄后將恢復(fù)為系統(tǒng)默認(rèn)值。

如果需要永久修改系統(tǒng)fd限制,可以在/etc/security/limits.conf文件中設(shè)置。打開該文件,在文件末尾添加如下內(nèi)容:

<用戶名> hard nofile 1024 
<用戶名> soft nofile 1024

另外,為了體現(xiàn)reactor的性能,需要將一些沒必要的打印關(guān)掉,因為打印會影響性能。

客戶端測試腳本代碼:

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>

#include <sys/time.h>


#define MAX_BUFFER		128
#define MAX_EPOLLSIZE	(384*1024)
#define MAX_PORT		1

#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)

int isContinue = 0;

static int ntySetNonblock(int fd) {
	int flags;

	flags = fcntl(fd, F_GETFL, 0);
	if (flags < 0) return flags;
	flags |= O_NONBLOCK;
	if (fcntl(fd, F_SETFL, flags) < 0) return -1;
	return 0;
}

static int ntySetReUseAddr(int fd) {
	int reuse = 1;
	return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}



int main(int argc, char **argv) {
	if (argc <= 2) {
		printf("Usage: %s ip port\n", argv[0]);
		exit(0);
	}

	const char *ip = argv[1];
	int port = atoi(argv[2]);
	int connections = 0;
	char buffer[128] = {0};
	int i = 0, index = 0;

	struct epoll_event events[MAX_EPOLLSIZE];
	
	int epoll_fd = epoll_create(MAX_EPOLLSIZE);
	
	strcpy(buffer, " Data From MulClient\n");
		
	struct sockaddr_in addr;
	memset(&addr, 0, sizeof(struct sockaddr_in));
	
	addr.sin_family = AF_INET;
	addr.sin_addr.s_addr = inet_addr(ip);

	struct timeval tv_begin;
	gettimeofday(&tv_begin, NULL);

	while (1) {
		if (++index >= MAX_PORT) index = 0;
		
		struct epoll_event ev;
		int sockfd = 0;

		if (connections < 340000 && !isContinue) {
			sockfd = socket(AF_INET, SOCK_STREAM, 0);
			if (sockfd == -1) {
				perror("socket");
				goto err;
			}

			//ntySetReUseAddr(sockfd);
			addr.sin_port = htons(port+index);

			if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
				perror("connect");
				goto err;
			}
			ntySetNonblock(sockfd);
			ntySetReUseAddr(sockfd);

			sprintf(buffer, "Hello Server: client --> %d\n", connections);
			send(sockfd, buffer, strlen(buffer), 0);

			ev.data.fd = sockfd;
			ev.events = EPOLLIN | EPOLLOUT;
			epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
		
			connections ++;
		}
		//connections ++;
		if (connections % 1000 == 999 || connections >= 340000) {
			struct timeval tv_cur;
			memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
			
			gettimeofday(&tv_begin, NULL);

			int time_used = TIME_SUB_MS(tv_begin, tv_cur);
			printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used);

			int nfds = epoll_wait(epoll_fd, events, connections, 100);
			for (i = 0;i < nfds;i ++) {
				int clientfd = events[i].data.fd;

				if (events[i].events & EPOLLOUT) {
					sprintf(buffer, "data from %d\n", clientfd);
					send(sockfd, buffer, strlen(buffer), 0);
				} else if (events[i].events & EPOLLIN) {
					char rBuffer[MAX_BUFFER] = {0};				
					ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);
					if (length > 0) {
						printf(" RecvBuffer:%s\n", rBuffer);

						if (!strcmp(rBuffer, "quit")) {
							isContinue = 0;
						}
						
					} else if (length == 0) {
						printf(" Disconnect clientfd:%d\n", clientfd);
						connections --;
						close(clientfd);
					} else {
						if (errno == EINTR) continue;

						printf(" Error clientfd:%d, errno:%d\n", clientfd, errno);
						close(clientfd);
					}
				} else {
					printf(" clientfd:%d, errno:%d\n", clientfd, errno);
					close(clientfd);
				}
			}
		}

		usleep(500);
	}

	return 0;

err:
	printf("error : %s\n", strerror(errno));
	return 0;
	
}

小結(jié)

至此,我們最終實現(xiàn)了支持高并發(fā)的服務(wù)器程序,但是,這個服務(wù)器程序有些局限性,我們還要繼續(xù)改善、優(yōu)化。在改進之前,需要開發(fā)一個后臺日志模塊,這是服務(wù)器程序必須的,所有,下一個章節(jié)將開發(fā)一個高效的后臺日志模塊。

TCP服務(wù)器的演變過程:使用epoll構(gòu)建reactor網(wǎng)絡(luò)模型實現(xiàn)百萬級并發(fā)(詳細(xì)代碼),Linux網(wǎng)絡(luò)設(shè)計,網(wǎng)絡(luò),tcp/ip,服務(wù)器,開發(fā)語言,網(wǎng)絡(luò)協(xié)議,c語言,linux文章來源地址http://www.zghlxwxcb.cn/news/detail-788913.html

到了這里,關(guān)于TCP服務(wù)器的演變過程:使用epoll構(gòu)建reactor網(wǎng)絡(luò)模型實現(xiàn)百萬級并發(fā)(詳細(xì)代碼)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • TCP服務(wù)器的演變過程:IO多路復(fù)用機制select實現(xiàn)TCP服務(wù)器

    TCP服務(wù)器的演變過程:IO多路復(fù)用機制select實現(xiàn)TCP服務(wù)器

    手把手教你從0開始編寫TCP服務(wù)器程序,體驗開局一塊磚,大廈全靠壘。 為了避免篇幅過長使讀者感到乏味,對【TCP服務(wù)器的開發(fā)】進行分階段實現(xiàn),一步步進行優(yōu)化升級。 本節(jié),在上一章節(jié)的基礎(chǔ)上,將并發(fā)的實現(xiàn)改為IO多路復(fù)用機制,使用select管理每個新接入的客戶端連

    2024年02月03日
    瀏覽(15)
  • 【TCP服務(wù)器的演變過程】編寫第一個TCP服務(wù)器:實現(xiàn)一對一的連接通信

    【TCP服務(wù)器的演變過程】編寫第一個TCP服務(wù)器:實現(xiàn)一對一的連接通信

    手把手教你從0開始編寫TCP服務(wù)器程序,體驗 開局一塊磚,大廈全靠壘 。 為了避免篇幅過長使讀者感到乏味,對【TCP服務(wù)器的開發(fā)】進行分階段實現(xiàn),一步步進行優(yōu)化升級。 函數(shù)原型: 這個函數(shù)建立一個協(xié)議族、協(xié)議類型、協(xié)議編號的socket文件描述符。如果函數(shù)調(diào)用成功,

    2024年02月03日
    瀏覽(21)
  • C/S架構(gòu)學(xué)習(xí)之使用epoll實現(xiàn)TCP特大型并發(fā)服務(wù)器

    epoll實現(xiàn)TCP特大型并發(fā)服務(wù)器的流程: 一、創(chuàng)建套接字(socket函數(shù)): 通信域 選擇 IPV4 網(wǎng)絡(luò)協(xié)議、套接字類型選擇 流式 ; 二、填充服務(wù)器和客戶機的網(wǎng)絡(luò)信息結(jié)構(gòu)體: 1.分別定義服務(wù)器網(wǎng)絡(luò)信息結(jié)構(gòu)體變量 serveraddr 和客戶機網(wǎng)絡(luò)信息結(jié)構(gòu)體變量 clientaddr ; 2.分別求出服務(wù)

    2024年02月08日
    瀏覽(25)
  • 基于epoll的TCP服務(wù)器端(C++)

    網(wǎng)絡(luò)編程——C++實現(xiàn)socket通信(TCP)高并發(fā)之epoll模式_tcp通信c++ 多客戶端epoll_n大橘為重n的博客-CSDN博客 網(wǎng)絡(luò)編程——C++實現(xiàn)socket通信(TCP)高并發(fā)之select模式_n大橘為重n的博客-CSDN博客 server.cpp? ?client.cpp

    2024年02月12日
    瀏覽(20)
  • 用反應(yīng)器模式和epoll構(gòu)建百萬并發(fā)服務(wù)器

    用反應(yīng)器模式和epoll構(gòu)建百萬并發(fā)服務(wù)器

    此處的百萬并發(fā)指的是可以建立至少100w個客戶端連接,不考慮業(yè)務(wù)處理。 反應(yīng)器模式下的epoll相比起普通的epoll不同在于:普通的epoll在獲取到就緒狀態(tài)的event結(jié)構(gòu)體之后,先判斷是什么類型的fd,再進行操作。而reactor先判斷是什么類型的事件,再進行操作。本文從頭用react

    2024年02月02日
    瀏覽(25)
  • TCP高并發(fā)服務(wù)器簡介(select、poll、epoll實現(xiàn)與區(qū)別)

    TCP高并發(fā)服務(wù)器簡介(select、poll、epoll實現(xiàn)與區(qū)別)

    一、創(chuàng)建套接字(socket函數(shù)): 二、填充服務(wù)器的網(wǎng)絡(luò)信息結(jié)構(gòu)體: 三、套接字和服務(wù)器的網(wǎng)絡(luò)信息結(jié)構(gòu)體進行綁定(bind函數(shù)): 四、套接字設(shè)置成被動監(jiān)聽(listen函數(shù)): 五、創(chuàng)建要監(jiān)聽的文件描述符集合: 使用select函數(shù)后,會將 沒有就緒的文件描述符 在集合中 去除

    2024年01月19日
    瀏覽(23)
  • 手撕測試tcp服務(wù)器效率工具——以epoll和io_uring對比為例

    手撕測試tcp服務(wù)器效率工具——以epoll和io_uring對比為例

    服務(wù)器的性能測試主要包括2部分: 并發(fā)量。能容納多大的連接 效率。在不崩壞的情況下能對報文的處理效率。 本文主要進行效率測試,看看基于epoll模型和io_uring模型的tcp服務(wù)器,誰的效率更高。 測試思路 客戶端(一個或多個)大量地向服務(wù)器發(fā)送報文,測試服務(wù)器的處理

    2024年01月18日
    瀏覽(23)
  • 使用Netty構(gòu)建TCP和UDP服務(wù)器和客戶端

    Netty是一個基于Java NIO實現(xiàn)的網(wǎng)絡(luò)通信框架,提供了高性能、低延遲的網(wǎng)絡(luò)通信能力。使用Netty構(gòu)建TCP和UDP服務(wù)器和客戶端非常簡單,下面是一個簡單的示例代碼: 構(gòu)建TCP服務(wù)器 構(gòu)建TCP客戶端 構(gòu)建UDP服務(wù)器 構(gòu)建UDP客戶端 ? 上述示例代碼中,分別定義了一個TCP服務(wù)器、TCP客戶

    2024年02月16日
    瀏覽(23)
  • TCP/IP客戶端和服務(wù)器端建立通信過程

    TCP/IP客戶端和服務(wù)器端建立通信過程

    使用Qt提供的類進行基于 TCP 的套接字通信需要用到兩個類: QTcpServer 類用于監(jiān)聽客戶端連接以及和客戶端建立連接,在使用之前先介紹一下這個類提供的一些常用API函數(shù): 構(gòu)造函數(shù) 給監(jiān)聽的套接字設(shè)置監(jiān)聽 listen() 函數(shù) 在代碼中 通過啟動監(jiān)聽按鈕 設(shè)置監(jiān)聽 參數(shù): address :

    2024年02月07日
    瀏覽(22)
  • Python網(wǎng)絡(luò)編程實戰(zhàn):構(gòu)建TCP服務(wù)器與客戶端

    Python網(wǎng)絡(luò)編程實戰(zhàn):構(gòu)建TCP服務(wù)器與客戶端 在信息化時代,網(wǎng)絡(luò)編程是軟件開發(fā)中不可或缺的一部分。Python作為一種功能強大的編程語言,提供了豐富的網(wǎng)絡(luò)編程庫和工具,使得開發(fā)者能夠輕松構(gòu)建各種網(wǎng)絡(luò)應(yīng)用。本文將詳細(xì)介紹如何在Python中進行網(wǎng)絡(luò)編程,特別是如何使用

    2024年04月15日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包