一、前言
手把手教你從0開始編寫TCP服務(wù)器程序,體驗開局一塊磚,大廈全靠壘。
為了避免篇幅過長使讀者感到乏味,對【TCP服務(wù)器的開發(fā)】進行分階段實現(xiàn),一步步進行優(yōu)化升級。
本節(jié),在上一章節(jié)介紹了如何使用epoll開發(fā)高效的服務(wù)器,本節(jié)將介紹使用epoll構(gòu)建reactor網(wǎng)絡(luò)模型,實現(xiàn)異步事件處理。
網(wǎng)絡(luò)并發(fā),通俗的講就是服務(wù)器可以承載的客戶端數(shù)量,即服務(wù)器可以穩(wěn)定保證客戶端同時接入的數(shù)量。
二、reactor簡介
Reactor模型開發(fā)效率比直接使用IO多路復(fù)用要高,它一般是單線程的,設(shè)計目標(biāo)是希望一個線程使用CPU的全部資源;帶來的優(yōu)點是,在每個事件處理中很多時候不需要考慮共享資源的互斥訪問。
Reactor模式是處理并發(fā)IO比較常見的模式,用于同步IO,核心思想是將所有要處理的IO事件注冊到一個中心IO多路復(fù)用器上,同時主線程或進程阻塞在IO多路復(fù)用器上;一旦有事件到來或準(zhǔn)備就緒,多路復(fù)用器返回并將事先注冊的相應(yīng) I/O 事件分發(fā)到對應(yīng)的處理器中。
Reactor的優(yōu)點:
-
響應(yīng)快;不必為單個同步事件阻塞,雖然Reactor本身依然是同步的。
-
編程相對簡單;可以最大程度的避免復(fù)雜的多線程及同步問題,盡可能的避免多線程、多進程的切換開銷。
-
可擴展性;可通過增加Reactor實例個數(shù),充分利用CPU資源。
-
高復(fù)用性;Reactor模型本身與事件處理邏輯無關(guān),具有很高的復(fù)用性。
三、實現(xiàn)步驟
3.1、step 1:定義Reactor模型相關(guān)結(jié)構(gòu)體
reactor數(shù)據(jù)結(jié)構(gòu)設(shè)計圖如下:
結(jié)構(gòu)說明:以fd作為索引,存放在block中;當(dāng)一個fd到來時,通過fd/MAX先找到fd對應(yīng)的block號,再通過fd%MAX找到對應(yīng)的偏移地址。例如來了個fd=10000,每個塊存放的最大item數(shù)量MAX=1024,那么fd對應(yīng)的block序號等于10000/1024=9;偏移量等于10000%1024=784。這樣就可以找到fd對應(yīng)的數(shù)據(jù)存放地址item。
數(shù)據(jù)結(jié)構(gòu)的代碼實現(xiàn)如下:
struct socket_item
{
/* data */
int fd; // socket的文件描述符
char *write_buffer; // 寫緩沖區(qū)
char *read_buffer; // 讀緩沖區(qū)
int write_length; // 已讀字節(jié)數(shù)
int read_length; // 已寫字節(jié)數(shù)
int status; // 狀態(tài)標(biāo)識,設(shè)置epfd的操作模式
int event; // 事件類型
void *arg; // 回調(diào)函數(shù)的參數(shù)
int(*callback)(int fd,int events,void* arg); // 回調(diào)函數(shù)
};
struct event_block
{
/* data */
struct socket_item *items; // 事件集合
struct event_block *next; // 指向像一個內(nèi)存塊
};
struct net_reactor
{
/* data */
int epollfd; // 事件塊的數(shù)量
int block_count; // 事件塊的數(shù)量
int finish_reactor; // 判斷是否退出服務(wù)
struct event_block *start_block; // 事件塊的起始地址
};
3.2、step 2:實現(xiàn)Reactor容器初始化功能
我們這里使用epoll作為IO多路復(fù)用器。
思路:初始化reactor內(nèi)存塊,避免臟數(shù)據(jù);創(chuàng)建events和block并初始化,將events添加到block中,將block添加到reactor的鏈表中管理。
// 2.
int init_reactor(struct net_reactor *reactor)
{
if(reactor==NULL)
return REACTOR_NULL;
memset(reactor,0,sizeof(struct net_reactor));
// 創(chuàng)建epoll,作為IO多路復(fù)用器
reactor->epollfd=epoll_create(1);
if(reactor->epollfd==-1){
printf("create epfd in %s error %s\n", __func__, strerror(errno));
return REACTOR_CREATE_EPOLL_FAILED;
}
// 創(chuàng)建事件集
struct socket_item *items=(struct socket_item *)malloc(MAX_SOCKET_ITEMS*sizeof(struct socket_item));
if(items==NULL)
{
printf("create socket_item in %s error %s\n", __func__, strerror(errno));
close(reactor->epollfd);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(items,0,MAX_SOCKET_ITEMS*sizeof(struct socket_item));
// 創(chuàng)建事件內(nèi)存塊
struct event_block *block=(struct event_block *)malloc(sizeof(struct event_block));
if(block==NULL)
{
printf("create block in %s error %s\n", __func__, strerror(errno));
free(items);
close(reactor->epollfd);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(block,0,sizeof(struct event_block));
block->items=items;
block->next=NULL;
reactor->block_count=1;
reactor->start_block=block;
reactor->finish_reactor=0;
return REACTOR_SUCCESS;
}
3.3、step 3:實現(xiàn)socket初始化功能
定義成一個函數(shù),方便初始化多個監(jiān)聽端口。
// 3.
int init_socket(short port)
{
int fd=socket(AF_INET,SOCK_STREAM,0);
if(fd==-1)
{
printf("create socket in %s error %s\n", __func__, strerror(errno));
return -1;
}
int ret;
// nonblock
int flag = fcntl(fd, F_GETFL, 0);
flag |= O_NONBLOCK;
ret=fcntl(fd, F_SETFL, flag);
if (ret == -1)
{
printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno));
close(fd);
return -1;
}
// 綁定
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_addr.s_addr=htonl(INADDR_ANY);
server.sin_family=AF_INET;
server.sin_port=htons(port);
ret=bind(fd,(struct sockaddr*)&server,sizeof(server));
if(ret==-1)
{
printf("bind() in %s error %s\n", __func__, strerror(errno));
close(fd);
return -1;
}
// 監(jiān)聽
ret=listen(fd,LISTEN_BLK_SIZE);
if(ret==-1)
{
printf("listen failed : %s\n", strerror(errno));
close(fd);
return -1;
}
printf("listen server port : %d\n", port);
return fd;
}
3.4、step 4:實現(xiàn)Reactor動態(tài)擴容功能
為了實現(xiàn)高并發(fā),服務(wù)器需要監(jiān)聽多個端口。當(dāng)高并發(fā)時需要reactor容器進行擴容管理。
核心思路:找到鏈表的末端,分別為events和block分配內(nèi)存并初始化,將events添加到block中,將block添加到reactor的鏈表中管理。
// 4. 實現(xiàn)Reactor動態(tài)擴容功能
static int reactor_resize(struct net_reactor *reactor)
{
if(reactor==NULL)
return REACTOR_NULL;
if(reactor->start_block==NULL)
return REACTOR_NULL;
// 找到鏈表末端
struct event_block *cur_block=reactor->start_block;
while(cur_block->next!=NULL)
{
cur_block=cur_block->next;
}
// 創(chuàng)建事件集
struct socket_item *items=(struct socket_item *)malloc(MAX_SOCKET_ITEMS*sizeof(struct socket_item));
if(items==NULL)
{
printf("create socket_item in %s error %s\n", __func__, strerror(errno));
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(items,0,MAX_SOCKET_ITEMS*sizeof(struct socket_item));
// 創(chuàng)建事件內(nèi)存塊
struct event_block *block=(struct event_block *)malloc(sizeof(struct event_block));
if(block==NULL)
{
printf("create block in %s error %s\n", __func__, strerror(errno));
free(items);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(block,0,sizeof(struct event_block));
block->next=NULL;
block->items=items;
cur_block->next=block;
reactor->block_count++;
return REACTOR_SUCCESS;
}
3.5、step 5:實現(xiàn)Reactor索引功能
思路:通過fd/MAX先找到fd對應(yīng)的block號,再通過fd%MAX找到對應(yīng)的偏移地址。
例如來了個fd=10000,每個塊存放的最大item數(shù)量MAX=1024,那么fd對應(yīng)的block序號等于10000/1024=9;偏移量等于10000%1024=784。這樣就可以找到fd對應(yīng)的數(shù)據(jù)存放地址item。
// 5. 實現(xiàn)Reactor索引功能
static struct socket_item *reactor_index(struct net_reactor *reactor,int socketfd)
{
if(reactor==NULL)
return NULL;
if(reactor->start_block==NULL)
return NULL;
// fd所在block序號
int block_id=socketfd/MAX_SOCKET_ITEMS;
// block序號不存在時自動擴容
while(block_id>=reactor->block_count)
{
if(reactor_resize(reactor)<0)
{
printf("reactor_resize in %s error %s\n", __func__, strerror(errno));
return NULL;
}
}
// 找到fd對應(yīng)block的位置
struct event_block *cur_block=reactor->start_block;
int i=0;
while(i++ !=block_id && cur_block!=NULL)
{
cur_block=cur_block->next;
}
return &cur_block->items[socketfd%MAX_SOCKET_ITEMS];
}
3.6、step 6:實現(xiàn)設(shè)置事件信息功能
將事件的相關(guān)信息保存到數(shù)據(jù)結(jié)構(gòu)中。主要實現(xiàn)填充關(guān)鍵信息到event結(jié)構(gòu)體中。
// 6. 實現(xiàn)設(shè)置事件信息功能
static void reactor_event_set(int fd,struct socket_item *sockevent,NCALLBACK callback,void *arg)
{
sockevent->arg=arg;
sockevent->callback=callback;
sockevent->event=0;
sockevent->fd=fd;
}
3.7、step 7:實現(xiàn)IO事件監(jiān)聽功能
這里使用epoll作為IO多路復(fù)用器,將事件添加到epoll中監(jiān)聽。
思路:主要是epoll_ctl操作,將事件添加到reactor的event結(jié)構(gòu)體中。
// 7. 實現(xiàn)設(shè)置IO事件監(jiān)聽功能
static int reactor_event_add(int epollfd,int events,struct socket_item *sockevent)
{
struct epoll_event ep_events={0,{0}};
ep_events.data.ptr=sockevent;
ep_events.events=events;
sockevent->event=events;
// 判斷,設(shè)置epfd的操作模式
int options;
if(sockevent->status==1)
{
options=EPOLL_CTL_MOD;
}
else{
options=EPOLL_CTL_ADD;
sockevent->status=1;
}
if(epoll_ctl(epollfd,options,sockevent->fd,&ep_events)<0)
{
printf("event add failed [fd=%d], events[%d]\n", sockevent->fd, events);
printf("event add failed in %s error %s\n", __func__, strerror(errno));
return -1;
}
return 0;
}
3.8、step 8:實現(xiàn)IO事件移除功能
由于設(shè)置了非阻塞模式,當(dāng)事件到來時,需要暫時移除監(jiān)聽,避免干擾。
// 8. 實現(xiàn)IO事件移除功能
static int reactor_event_del(int epollfd,struct socket_item *sockevent)
{
if(sockevent->status!=1)
return -1;
struct epoll_event ep_events={0,{0}};
ep_events.data.ptr=sockevent;
sockevent->status=0;
// 移除fd的監(jiān)聽
if(epoll_ctl(epollfd, EPOLL_CTL_DEL,sockevent->fd, &ep_events)<0)
{
printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
return -1;
}
return 0;
}
3.9、step 9:實現(xiàn)Reactor事件監(jiān)聽功能
思路:設(shè)置fd的事件信息,添加事件到epoll監(jiān)聽。
// 9. 實現(xiàn)Reactor事件監(jiān)聽功能
int reactor_add_listener(struct net_reactor *reactor,int sockfd,NCALLBACK *acceptor)
{
if(reactor==NULL)
return REACTOR_NULL;
if(reactor->start_block==NULL)
return REACTOR_NULL;
// 找到fd對應(yīng)的event地址
struct socket_item *item=reactor_index(reactor,sockfd);
if(item==NULL)
{
printf("reactor_index failed in %s error %s\n", __func__, strerror(errno));
return REACTOR_ADD_LISTEN_FAILED;
}
reactor_event_set(sockfd,item,acceptor,reactor);
if(reactor_event_add(reactor->epollfd,EPOLLIN,item)<0)
{
return REACTOR_ADD_LISTEN_FAILED;
}
printf("add listen fd = %d\n",sockfd);
return REACTOR_SUCCESS;
}
3.10、step 10:實現(xiàn)recv回調(diào)函數(shù)
思路:找到fd對應(yīng)的信息內(nèi)存塊;使用recv接收數(shù)據(jù);暫時移除該事件的監(jiān)聽;如果接收成功,設(shè)置監(jiān)聽事件為是否可寫,添加到IO多路復(fù)用器(epoll)中;返回收到的數(shù)據(jù)長度。
// 10:實現(xiàn)recv回調(diào)函數(shù)
static int callback_recv(int fd, int events, void *arg)
{
struct net_reactor *reactor=(struct net_reactor *)arg;
if(reactor==NULL)
return REACTOR_NULL;
// 找到fd對應(yīng)的event地址
struct socket_item *item=reactor_index(reactor,fd);
if(item==NULL)
{
printf("callback_recv in %s error %s\n", __func__, strerror(errno));
return REACTOR_MALLOC_MEMORY_FAILED;
}
// 接收數(shù)據(jù)
int ret= recv(fd,item->read_buffer,BUFFER_LENGTH,0);
// 暫時移除監(jiān)聽
if(reactor_event_del(reactor->epollfd, item)<0)
{
printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
//return REACTOR_EVENT_DEL_FAILED;
}
if(ret>0)
{
item->read_length+=ret;
printf("recv [%d]:%s\n", fd, item->read_buffer);
// demo
memcpy(item->write_buffer,item->read_buffer,ret);
item->write_buffer[ret]='\0';
item->write_length=ret;
reactor_event_set(fd,item,callback_send,reactor);
if(reactor_event_add(reactor->epollfd,EPOLLOUT,item)<0)
{
printf("reactor_event_add failed in %s error %s\n", __func__, strerror(errno));
//return REACTOR_ADD_LISTEN_FAILED;
}
}
else if(ret==0)
{
printf("recv_cb --> disconnect\n");
free(item->read_buffer);
free(item->write_buffer);
close(item->fd);
}
else
{
if(errno==EAGAIN || errno==EWOULDBLOCK)
{
// 表示沒有數(shù)據(jù)可讀。這時可以繼續(xù)等待數(shù)據(jù)到來,或者關(guān)閉套接字。
}
else if (errno == ECONNRESET) {
// reactor_event_del(reactor->epollfd, item);
free(item->read_buffer);
free(item->write_buffer);
close(item->fd);
}
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}
return ret;
}
3.11、step 11:實現(xiàn)send回調(diào)函數(shù)
思路:找到fd對應(yīng)的信息內(nèi)存塊;使用send發(fā)送數(shù)據(jù);暫時移除該事件的監(jiān)聽;如果發(fā)送成功,設(shè)置監(jiān)聽事件為是否可讀,添加到IO多路復(fù)用器(epoll)中;返回發(fā)送的數(shù)據(jù)長度。
// 11:實現(xiàn)send回調(diào)函數(shù)
static int callback_send(int fd, int events, void *arg)
{
struct net_reactor *reactor=(struct net_reactor *)arg;
if(reactor==NULL)
{
return REACTOR_NULL;
}
// 找到fd對應(yīng)的event地址
struct socket_item *item=reactor_index(reactor,fd);
if(item==NULL)
{
printf("callback_recv in %s error %s\n", __func__, strerror(errno));
return REACTOR_MALLOC_MEMORY_FAILED;
}
int ret=send(fd,item->write_buffer,item->write_length,0);
// 暫時移除監(jiān)聽
if(reactor_event_del(reactor->epollfd, item)<0)
{
printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
//return REACTOR_EVENT_DEL_FAILED;
}
if (ret > 0)
{
printf("send[fd=%d], [%d]%s\n", fd, ret, item->write_buffer);
reactor_event_set(fd, item, callback_recv, reactor);
reactor_event_add(reactor->epollfd, EPOLLIN, item);
}
else
{
free(item->read_buffer);
free(item->write_buffer);
close(fd);
printf("send[fd=%d] error %s\n", fd, strerror(errno));
}
return ret;
}
3.12、step 12:實現(xiàn)accept回調(diào)函數(shù)
思路:使用accept獲得連接的客戶端fd;設(shè)置客戶端fd為非阻塞模式;找到fd對應(yīng)的信息內(nèi)存塊;設(shè)置fd的事件信息;設(shè)置監(jiān)聽事件為是否可讀,添加到IO多路復(fù)用器(epoll)中。
// 12. 實現(xiàn)accept回調(diào)函數(shù)
int callback_accept(int fd, int events, void *arg)
{
struct net_reactor *reactor=(struct net_reactor *)arg;
if(reactor==NULL)
{
return REACTOR_NULL;
}
struct sockaddr_in client;
socklen_t len=sizeof(client);
int connectfd=accept(fd,(struct sockaddr*)&client,&len);
if(connectfd<0)
{
printf("accept failed in %s error %s\n", __func__, strerror(errno));
return REACTOR_ACCEPT_FAILED;
}
// 設(shè)置非阻塞
int flag = fcntl(connectfd, F_GETFL, 0);
flag |= O_NONBLOCK;
int ret=fcntl(connectfd, F_SETFL, flag);
if (ret == -1)
{
printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno));
close(connectfd);
return REACTOR_FCNTL_FAILED;
}
// 找到fd對應(yīng)的event地址
struct socket_item *item=reactor_index(reactor,connectfd);
if(item==NULL)
{
printf("reactor_index in %s error %s\n", __func__, strerror(errno));
close(connectfd);
return REACTOR_MALLOC_MEMORY_FAILED;
}
// 設(shè)置fd的事件信息
reactor_event_set(connectfd,item,callback_recv,reactor);
// 添加事件到epoll監(jiān)聽
if(reactor_event_add(reactor->epollfd,EPOLLIN,item)<0)
{
close(connectfd);
return REACTOR_ADD_LISTEN_FAILED;
}
// 為fd分配好讀寫緩沖區(qū)
item->read_buffer=(char *)malloc(BUFFER_LENGTH * sizeof(char));
if(item->read_buffer==NULL)
{
printf("mallc in %s error %s\n", __func__, strerror(errno));
close(connectfd);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(item->read_buffer,0,BUFFER_LENGTH * sizeof(char));
item->read_length=0;
item->write_buffer=(char *)malloc(BUFFER_LENGTH * sizeof(char));
if(item->write_buffer==NULL)
{
printf("mallc in %s error %s\n", __func__, strerror(errno));
close(connectfd);
free(item->read_buffer);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(item->write_buffer,0,BUFFER_LENGTH * sizeof(char));
item->write_length=0;
printf("new connect [%s:%d], pos[%d]\n",inet_ntoa(client.sin_addr), ntohs(client.sin_port), connectfd);
return REACTOR_SUCCESS;
}
3.13、step 13:實現(xiàn)reactor運行函數(shù)
主要是epoll的等待功能,將監(jiān)聽到的事件進行回調(diào)處理。
// 13:實現(xiàn)reactor運行函數(shù)
int reactor_run(struct net_reactor *reactor)
{
if(reactor==NULL)
return REACTOR_NULL;
if(reactor->start_block==NULL)
return REACTOR_NULL;
if(reactor->epollfd<0)
return REACTOR_CREATE_EPOLL_FAILED;
struct epoll_event events[MAX_EPOLL_EVENTS + 1];
while(!reactor->finish_reactor)
{
int nready=epoll_wait(reactor->epollfd,events,MAX_EPOLL_EVENTS,1000);
if(nready<0)
{
printf("epoll wait error\n");
continue;
}
int i=0;
for(i=0;i<nready;i++)
{
struct socket_item *item=(struct socket_item *)events[i].data.ptr;
if((events[i].events & EPOLLIN) && (item->event&EPOLLIN))
{
// 處理可讀事件
item->callback(item->fd, events[i].events, item->arg);
}
if((events[i].events & EPOLLOUT) && (item->event&EPOLLOUT))
{
// 處理可讀事件
item->callback(item->fd, events[i].events, item->arg);
}
}
}
printf("Clearing memory......\n");
reactor_destory(reactor);
printf("finish reactor\n");
return REACTOR_SUCCESS;
}
3.14、step 14:實現(xiàn)reactor銷毀功能
// 14:實現(xiàn)reactor銷毀功能
int reactor_destory(struct net_reactor *reactor)
{
// 關(guān)閉epoll
close(reactor->epollfd);
struct event_block *blk= reactor->start_block;
struct event_block *next;
while (blk != NULL)
{
next = blk->next;
// 釋放內(nèi)存塊
for(int i=0;i<MAX_SOCKET_ITEMS;i++)
{
if(blk->items[i].read_buffer!=NULL)
{
free(blk->items[i].read_buffer);
blk->items[i].read_buffer=NULL;
}
if(blk->items[i].write_buffer!=NULL)
{
free(blk->items[i].write_buffer);
blk->items[i].write_buffer=NULL;
}
}
free(blk->items);
free(blk);
blk = next;
}
return REACTOR_SUCCESS;
}
3.15、使用示例
- 創(chuàng)建structnet_reactor對象。
- 調(diào)用init_reactor初始化。
- 調(diào)用init_socket監(jiān)聽端口。
- 調(diào)用reactor_add_listener將端口添加到reactor中管理。
- 調(diào)用reactor_run運行reactor服務(wù)。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "reactor.h"
#define SERVER_PORT 9703
#define PORT_COUNT 2
int main(int argc,char **argv)
{
struct net_reactor *reactor=(struct net_reactor *)malloc(sizeof(struct net_reactor));
if(reactor==NULL)
{
perror("malloc struct net_reactor failed!\n");
return REACTOR_MALLOC_MEMORY_FAILED;
}
if(init_reactor(reactor)<0)
{
free(reactor);
return REACTOR_NULL;
}
unsigned short port = SERVER_PORT;
int sockfds[PORT_COUNT]={0};
int i=0;
for(i=0;i<PORT_COUNT;i++)
{
sockfds[i]=init_socket(port+i);
if(sockfds[i]>0)
{
if(reactor_add_listener(reactor,sockfds[i],callback_accept)<0)
printf("reactor_add_listener failed in %s : %d\n",__func__,__LINE__);
}
else
{
printf("init_socket failed in %s : %d\n",__func__,__LINE__);
}
}
reactor_run(reactor);
// 銷毀 reactor
//reactor_destory(reactor);
reactor->finish_reactor=1;
// 關(guān)閉socket集
for(i=0;i<PORT_COUNT;i++)
{
close(sockfds[i]);
}
// 釋放reactor
free(reactor);
return 0;
}
編譯:
gcc -o server main.c reactor.c
四、完整代碼
reactor.h
#ifndef _REACTOR_H_
#define _REACTOR_H_
enum REACTOR_ERROR_CODE{
REACTOR_EVENT_DEL_FAILED=-7,
REACTOR_FCNTL_FAILED=-6,
REACTOR_ACCEPT_FAILED=-5,
REACTOR_ADD_LISTEN_FAILED=-4,
REACTOR_MALLOC_MEMORY_FAILED=-3,
REACTOR_CREATE_EPOLL_FAILED=-2,
REACTOR_NULL=-1,
REACTOR_SUCCESS=0
};
struct socket_item
{
/* data */
int fd; // socket的文件描述符
char *write_buffer; // 寫緩沖區(qū)
char *read_buffer; // 讀緩沖區(qū)
int write_length; // 已讀字節(jié)數(shù)
int read_length; // 已寫字節(jié)數(shù)
int status; // 狀態(tài)標(biāo)識,設(shè)置epfd的操作模式
int event; // 事件類型
void *arg; // 回調(diào)函數(shù)的參數(shù)
int(*callback)(int fd,int events,void* arg); // 回調(diào)函數(shù)
};
struct event_block
{
/* data */
struct socket_item *items; // 事件集合
struct event_block *next; // 指向像一個內(nèi)存塊
};
struct net_reactor
{
/* data */
int epollfd; // 事件塊的數(shù)量
int block_count; // 事件塊的數(shù)量
int finish_reactor; // 判斷是否退出服務(wù)
struct event_block *start_block; // 事件塊的起始地址
};
typedef int NCALLBACK(int ,int, void*);
int init_reactor(struct net_reactor *reactor);
int init_socket(short port);
int callback_accept(int fd, int events, void *arg);
int reactor_destory(struct net_reactor *reactor);
int reactor_add_listener(struct net_reactor *reactor,int sockfd,NCALLBACK *acceptor);
int reactor_run(struct net_reactor *reactor);
static int callback_send(int fd, int events, void *arg);
static int callback_recv(int fd, int events, void *arg);
#endif
reactor.c
#include <stdlib.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include "reactor.h"
#define MAX_SOCKET_ITEMS 1024
#define LISTEN_BLK_SIZE 20
#define BUFFER_LENGTH 1024
#define MAX_EPOLL_EVENTS 1024
// 2.
int init_reactor(struct net_reactor *reactor)
{
if(reactor==NULL)
return REACTOR_NULL;
memset(reactor,0,sizeof(struct net_reactor));
// 創(chuàng)建epoll,作為IO多路復(fù)用器
reactor->epollfd=epoll_create(1);
if(reactor->epollfd==-1){
printf("create epfd in %s error %s\n", __func__, strerror(errno));
return REACTOR_CREATE_EPOLL_FAILED;
}
// 創(chuàng)建事件集
struct socket_item *items=(struct socket_item *)malloc(MAX_SOCKET_ITEMS*sizeof(struct socket_item));
if(items==NULL)
{
printf("create socket_item in %s error %s\n", __func__, strerror(errno));
close(reactor->epollfd);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(items,0,MAX_SOCKET_ITEMS*sizeof(struct socket_item));
// 創(chuàng)建事件內(nèi)存塊
struct event_block *block=(struct event_block *)malloc(sizeof(struct event_block));
if(block==NULL)
{
printf("create block in %s error %s\n", __func__, strerror(errno));
free(items);
close(reactor->epollfd);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(block,0,sizeof(struct event_block));
block->items=items;
block->next=NULL;
reactor->block_count=1;
reactor->start_block=block;
reactor->finish_reactor=0;
return REACTOR_SUCCESS;
}
// 3.
int init_socket(short port)
{
int fd=socket(AF_INET,SOCK_STREAM,0);
if(fd==-1)
{
printf("create socket in %s error %s\n", __func__, strerror(errno));
return -1;
}
int ret;
// nonblock
int flag = fcntl(fd, F_GETFL, 0);
flag |= O_NONBLOCK;
ret=fcntl(fd, F_SETFL, flag);
if (ret == -1)
{
printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno));
close(fd);
return -1;
}
// 綁定
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_addr.s_addr=htonl(INADDR_ANY);
server.sin_family=AF_INET;
server.sin_port=htons(port);
ret=bind(fd,(struct sockaddr*)&server,sizeof(server));
if(ret==-1)
{
printf("bind() in %s error %s\n", __func__, strerror(errno));
close(fd);
return -1;
}
// 監(jiān)聽
ret=listen(fd,LISTEN_BLK_SIZE);
if(ret==-1)
{
printf("listen failed : %s\n", strerror(errno));
close(fd);
return -1;
}
printf("listen server port : %d\n", port);
return fd;
}
// 4. 實現(xiàn)Reactor動態(tài)擴容功能
static int reactor_resize(struct net_reactor *reactor)
{
if(reactor==NULL)
return REACTOR_NULL;
if(reactor->start_block==NULL)
return REACTOR_NULL;
// 找到鏈表末端
struct event_block *cur_block=reactor->start_block;
while(cur_block->next!=NULL)
{
cur_block=cur_block->next;
}
// 創(chuàng)建事件集
struct socket_item *items=(struct socket_item *)malloc(MAX_SOCKET_ITEMS*sizeof(struct socket_item));
if(items==NULL)
{
printf("create socket_item in %s error %s\n", __func__, strerror(errno));
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(items,0,MAX_SOCKET_ITEMS*sizeof(struct socket_item));
// 創(chuàng)建事件內(nèi)存塊
struct event_block *block=(struct event_block *)malloc(sizeof(struct event_block));
if(block==NULL)
{
printf("create block in %s error %s\n", __func__, strerror(errno));
free(items);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(block,0,sizeof(struct event_block));
block->next=NULL;
block->items=items;
cur_block->next=block;
reactor->block_count++;
return REACTOR_SUCCESS;
}
// 5. 實現(xiàn)Reactor索引功能
static struct socket_item *reactor_index(struct net_reactor *reactor,int socketfd)
{
if(reactor==NULL)
return NULL;
if(reactor->start_block==NULL)
return NULL;
// fd所在block序號
int block_id=socketfd/MAX_SOCKET_ITEMS;
// block序號不存在時自動擴容
while(block_id>=reactor->block_count)
{
if(reactor_resize(reactor)<0)
{
printf("reactor_resize in %s error %s\n", __func__, strerror(errno));
return NULL;
}
}
// 找到fd對應(yīng)block的位置
struct event_block *cur_block=reactor->start_block;
int i=0;
while(i++ !=block_id && cur_block!=NULL)
{
cur_block=cur_block->next;
}
return &cur_block->items[socketfd%MAX_SOCKET_ITEMS];
}
// 6. 實現(xiàn)設(shè)置事件信息功能
static void reactor_event_set(int fd,struct socket_item *sockevent,NCALLBACK callback,void *arg)
{
sockevent->arg=arg;
sockevent->callback=callback;
sockevent->event=0;
sockevent->fd=fd;
}
// 7. 實現(xiàn)設(shè)置IO事件監(jiān)聽功能
static int reactor_event_add(int epollfd,int events,struct socket_item *sockevent)
{
struct epoll_event ep_events={0,{0}};
ep_events.data.ptr=sockevent;
ep_events.events=events;
sockevent->event=events;
// 判斷,設(shè)置epfd的操作模式
int options;
if(sockevent->status==1)
{
options=EPOLL_CTL_MOD;
}
else{
options=EPOLL_CTL_ADD;
sockevent->status=1;
}
if(epoll_ctl(epollfd,options,sockevent->fd,&ep_events)<0)
{
printf("event add failed [fd=%d], events[%d]\n", sockevent->fd, events);
printf("event add failed in %s error %s\n", __func__, strerror(errno));
return -1;
}
return 0;
}
// 8. 實現(xiàn)IO事件移除功能
static int reactor_event_del(int epollfd,struct socket_item *sockevent)
{
if(sockevent->status!=1)
return -1;
struct epoll_event ep_events={0,{0}};
ep_events.data.ptr=sockevent;
sockevent->status=0;
// 移除fd的監(jiān)聽
if(epoll_ctl(epollfd, EPOLL_CTL_DEL,sockevent->fd, &ep_events)<0)
{
printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
return -1;
}
return 0;
}
// 9. 實現(xiàn)Reactor事件監(jiān)聽功能
int reactor_add_listener(struct net_reactor *reactor,int sockfd,NCALLBACK *acceptor)
{
if(reactor==NULL)
return REACTOR_NULL;
if(reactor->start_block==NULL)
return REACTOR_NULL;
// 找到fd對應(yīng)的event地址
struct socket_item *item=reactor_index(reactor,sockfd);
if(item==NULL)
{
printf("reactor_index failed in %s error %s\n", __func__, strerror(errno));
return REACTOR_ADD_LISTEN_FAILED;
}
reactor_event_set(sockfd,item,acceptor,reactor);
if(reactor_event_add(reactor->epollfd,EPOLLIN,item)<0)
{
return REACTOR_ADD_LISTEN_FAILED;
}
printf("add listen fd = %d\n",sockfd);
return REACTOR_SUCCESS;
}
// 10:實現(xiàn)recv回調(diào)函數(shù)
static int callback_recv(int fd, int events, void *arg)
{
struct net_reactor *reactor=(struct net_reactor *)arg;
if(reactor==NULL)
return REACTOR_NULL;
// 找到fd對應(yīng)的event地址
struct socket_item *item=reactor_index(reactor,fd);
if(item==NULL)
{
printf("callback_recv in %s error %s\n", __func__, strerror(errno));
return REACTOR_MALLOC_MEMORY_FAILED;
}
// 接收數(shù)據(jù)
int ret= recv(fd,item->read_buffer,BUFFER_LENGTH,0);
// 暫時移除監(jiān)聽
if(reactor_event_del(reactor->epollfd, item)<0)
{
printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
//return REACTOR_EVENT_DEL_FAILED;
}
if(ret>0)
{
item->read_length+=ret;
printf("recv [%d]:%s\n", fd, item->read_buffer);
// demo
memcpy(item->write_buffer,item->read_buffer,ret);
item->write_buffer[ret]='\0';
item->write_length=ret;
reactor_event_set(fd,item,callback_send,reactor);
if(reactor_event_add(reactor->epollfd,EPOLLOUT,item)<0)
{
printf("reactor_event_add failed in %s error %s\n", __func__, strerror(errno));
//return REACTOR_ADD_LISTEN_FAILED;
}
}
else if(ret==0)
{
printf("recv_cb --> disconnect\n");
free(item->read_buffer);
free(item->write_buffer);
close(item->fd);
}
else
{
if(errno==EAGAIN || errno==EWOULDBLOCK)
{
// 表示沒有數(shù)據(jù)可讀。這時可以繼續(xù)等待數(shù)據(jù)到來,或者關(guān)閉套接字。
}
else if (errno == ECONNRESET) {
// reactor_event_del(reactor->epollfd, item);
free(item->read_buffer);
free(item->write_buffer);
close(item->fd);
}
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}
return ret;
}
// 11:實現(xiàn)send回調(diào)函數(shù)
static int callback_send(int fd, int events, void *arg)
{
struct net_reactor *reactor=(struct net_reactor *)arg;
if(reactor==NULL)
{
return REACTOR_NULL;
}
// 找到fd對應(yīng)的event地址
struct socket_item *item=reactor_index(reactor,fd);
if(item==NULL)
{
printf("callback_recv in %s error %s\n", __func__, strerror(errno));
return REACTOR_MALLOC_MEMORY_FAILED;
}
int ret=send(fd,item->write_buffer,item->write_length,0);
// 暫時移除監(jiān)聽
if(reactor_event_del(reactor->epollfd, item)<0)
{
printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
//return REACTOR_EVENT_DEL_FAILED;
}
if (ret > 0)
{
printf("send[fd=%d], [%d]%s\n", fd, ret, item->write_buffer);
reactor_event_set(fd, item, callback_recv, reactor);
reactor_event_add(reactor->epollfd, EPOLLIN, item);
}
else
{
free(item->read_buffer);
free(item->write_buffer);
close(fd);
printf("send[fd=%d] error %s\n", fd, strerror(errno));
}
return ret;
}
// 12. 實現(xiàn)accept回調(diào)函數(shù)
int callback_accept(int fd, int events, void *arg)
{
struct net_reactor *reactor=(struct net_reactor *)arg;
if(reactor==NULL)
{
return REACTOR_NULL;
}
struct sockaddr_in client;
socklen_t len=sizeof(client);
int connectfd=accept(fd,(struct sockaddr*)&client,&len);
if(connectfd<0)
{
printf("accept failed in %s error %s\n", __func__, strerror(errno));
return REACTOR_ACCEPT_FAILED;
}
// 設(shè)置非阻塞
int flag = fcntl(connectfd, F_GETFL, 0);
flag |= O_NONBLOCK;
int ret=fcntl(connectfd, F_SETFL, flag);
if (ret == -1)
{
printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno));
close(connectfd);
return REACTOR_FCNTL_FAILED;
}
// 找到fd對應(yīng)的event地址
struct socket_item *item=reactor_index(reactor,connectfd);
if(item==NULL)
{
printf("reactor_index in %s error %s\n", __func__, strerror(errno));
close(connectfd);
return REACTOR_MALLOC_MEMORY_FAILED;
}
// 設(shè)置fd的事件信息
reactor_event_set(connectfd,item,callback_recv,reactor);
// 添加事件到epoll監(jiān)聽
if(reactor_event_add(reactor->epollfd,EPOLLIN,item)<0)
{
close(connectfd);
return REACTOR_ADD_LISTEN_FAILED;
}
// 為fd分配好讀寫緩沖區(qū)
item->read_buffer=(char *)malloc(BUFFER_LENGTH * sizeof(char));
if(item->read_buffer==NULL)
{
printf("mallc in %s error %s\n", __func__, strerror(errno));
close(connectfd);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(item->read_buffer,0,BUFFER_LENGTH * sizeof(char));
item->read_length=0;
item->write_buffer=(char *)malloc(BUFFER_LENGTH * sizeof(char));
if(item->write_buffer==NULL)
{
printf("mallc in %s error %s\n", __func__, strerror(errno));
close(connectfd);
free(item->read_buffer);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(item->write_buffer,0,BUFFER_LENGTH * sizeof(char));
item->write_length=0;
printf("new connect [%s:%d], pos[%d]\n",inet_ntoa(client.sin_addr), ntohs(client.sin_port), connectfd);
return REACTOR_SUCCESS;
}
// 13:實現(xiàn)reactor運行函數(shù)
int reactor_run(struct net_reactor *reactor)
{
if(reactor==NULL)
return REACTOR_NULL;
if(reactor->start_block==NULL)
return REACTOR_NULL;
if(reactor->epollfd<0)
return REACTOR_CREATE_EPOLL_FAILED;
struct epoll_event events[MAX_EPOLL_EVENTS + 1];
while(!reactor->finish_reactor)
{
int nready=epoll_wait(reactor->epollfd,events,MAX_EPOLL_EVENTS,1000);
if(nready<0)
{
printf("epoll wait error\n");
continue;
}
int i=0;
for(i=0;i<nready;i++)
{
struct socket_item *item=(struct socket_item *)events[i].data.ptr;
if((events[i].events & EPOLLIN) && (item->event&EPOLLIN))
{
// 處理可讀事件
item->callback(item->fd, events[i].events, item->arg);
}
if((events[i].events & EPOLLOUT) && (item->event&EPOLLOUT))
{
// 處理可讀事件
item->callback(item->fd, events[i].events, item->arg);
}
}
}
printf("Clearing memory......\n");
reactor_destory(reactor);
printf("finish reactor\n");
return REACTOR_SUCCESS;
}
// 14:實現(xiàn)reactor銷毀功能
int reactor_destory(struct net_reactor *reactor)
{
// 關(guān)閉epoll
close(reactor->epollfd);
struct event_block *blk= reactor->start_block;
struct event_block *next;
while (blk != NULL)
{
next = blk->next;
// 釋放內(nèi)存塊
for(int i=0;i<MAX_SOCKET_ITEMS;i++)
{
if(blk->items[i].read_buffer!=NULL)
{
free(blk->items[i].read_buffer);
blk->items[i].read_buffer=NULL;
}
if(blk->items[i].write_buffer!=NULL)
{
free(blk->items[i].write_buffer);
blk->items[i].write_buffer=NULL;
}
}
free(blk->items);
free(blk);
blk = next;
}
return REACTOR_SUCCESS;
}
main.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "reactor.h"
#define SERVER_PORT 9703
#define PORT_COUNT 2
int main(int argc,char **argv)
{
struct net_reactor *reactor=(struct net_reactor *)malloc(sizeof(struct net_reactor));
if(reactor==NULL)
{
perror("malloc struct net_reactor failed!\n");
return REACTOR_MALLOC_MEMORY_FAILED;
}
if(init_reactor(reactor)<0)
{
free(reactor);
return REACTOR_NULL;
}
unsigned short port = SERVER_PORT;
int sockfds[PORT_COUNT]={0};
int i=0;
for(i=0;i<PORT_COUNT;i++)
{
sockfds[i]=init_socket(port+i);
if(sockfds[i]>0)
{
if(reactor_add_listener(reactor,sockfds[i],callback_accept)<0)
printf("reactor_add_listener failed in %s : %d\n",__func__,__LINE__);
}
else
{
printf("init_socket failed in %s : %d\n",__func__,__LINE__);
}
}
reactor_run(reactor);
// 銷毀 reactor
//reactor_destory(reactor);
reactor->finish_reactor=1;
// 關(guān)閉socket集
for(i=0;i<PORT_COUNT;i++)
{
close(sockfds[i]);
}
// 釋放reactor
free(reactor);
return 0;
}
五、百萬級并發(fā)連接測試
自己實現(xiàn)的并發(fā)連接客戶端程序,使用三臺測試平臺分別執(zhí)行測試程序,平均一臺連接30w+。需要注意的是,為了幫助有足夠的fd可以分配,需要使用如下命令修改文件描述符限制:
ulimit -n 1024000
這個限制只在當(dāng)前會話中生效,重新登錄后將恢復(fù)為系統(tǒng)默認(rèn)值。
如果需要永久修改系統(tǒng)fd限制,可以在/etc/security/limits.conf文件中設(shè)置。打開該文件,在文件末尾添加如下內(nèi)容:
<用戶名> hard nofile 1024
<用戶名> soft nofile 1024
另外,為了體現(xiàn)reactor的性能,需要將一些沒必要的打印關(guān)掉,因為打印會影響性能。
客戶端測試腳本代碼:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/time.h>
#define MAX_BUFFER 128
#define MAX_EPOLLSIZE (384*1024)
#define MAX_PORT 1
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
int isContinue = 0;
static int ntySetNonblock(int fd) {
int flags;
flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) return flags;
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) < 0) return -1;
return 0;
}
static int ntySetReUseAddr(int fd) {
int reuse = 1;
return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}
int main(int argc, char **argv) {
if (argc <= 2) {
printf("Usage: %s ip port\n", argv[0]);
exit(0);
}
const char *ip = argv[1];
int port = atoi(argv[2]);
int connections = 0;
char buffer[128] = {0};
int i = 0, index = 0;
struct epoll_event events[MAX_EPOLLSIZE];
int epoll_fd = epoll_create(MAX_EPOLLSIZE);
strcpy(buffer, " Data From MulClient\n");
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(ip);
struct timeval tv_begin;
gettimeofday(&tv_begin, NULL);
while (1) {
if (++index >= MAX_PORT) index = 0;
struct epoll_event ev;
int sockfd = 0;
if (connections < 340000 && !isContinue) {
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
perror("socket");
goto err;
}
//ntySetReUseAddr(sockfd);
addr.sin_port = htons(port+index);
if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
perror("connect");
goto err;
}
ntySetNonblock(sockfd);
ntySetReUseAddr(sockfd);
sprintf(buffer, "Hello Server: client --> %d\n", connections);
send(sockfd, buffer, strlen(buffer), 0);
ev.data.fd = sockfd;
ev.events = EPOLLIN | EPOLLOUT;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
connections ++;
}
//connections ++;
if (connections % 1000 == 999 || connections >= 340000) {
struct timeval tv_cur;
memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
gettimeofday(&tv_begin, NULL);
int time_used = TIME_SUB_MS(tv_begin, tv_cur);
printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used);
int nfds = epoll_wait(epoll_fd, events, connections, 100);
for (i = 0;i < nfds;i ++) {
int clientfd = events[i].data.fd;
if (events[i].events & EPOLLOUT) {
sprintf(buffer, "data from %d\n", clientfd);
send(sockfd, buffer, strlen(buffer), 0);
} else if (events[i].events & EPOLLIN) {
char rBuffer[MAX_BUFFER] = {0};
ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);
if (length > 0) {
printf(" RecvBuffer:%s\n", rBuffer);
if (!strcmp(rBuffer, "quit")) {
isContinue = 0;
}
} else if (length == 0) {
printf(" Disconnect clientfd:%d\n", clientfd);
connections --;
close(clientfd);
} else {
if (errno == EINTR) continue;
printf(" Error clientfd:%d, errno:%d\n", clientfd, errno);
close(clientfd);
}
} else {
printf(" clientfd:%d, errno:%d\n", clientfd, errno);
close(clientfd);
}
}
}
usleep(500);
}
return 0;
err:
printf("error : %s\n", strerror(errno));
return 0;
}
小結(jié)
至此,我們最終實現(xiàn)了支持高并發(fā)的服務(wù)器程序,但是,這個服務(wù)器程序有些局限性,我們還要繼續(xù)改善、優(yōu)化。在改進之前,需要開發(fā)一個后臺日志模塊,這是服務(wù)器程序必須的,所有,下一個章節(jié)將開發(fā)一個高效的后臺日志模塊。文章來源:http://www.zghlxwxcb.cn/news/detail-788913.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-788913.html
到了這里,關(guān)于TCP服務(wù)器的演變過程:使用epoll構(gòu)建reactor網(wǎng)絡(luò)模型實現(xiàn)百萬級并發(fā)(詳細(xì)代碼)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!