1、什么是Reactor模型
? ? ? ? Reactor意思是“反應(yīng)堆”,是一種事件驅(qū)動(dòng)機(jī)制。
????????和普通函數(shù)調(diào)用的不同之處在于:應(yīng)用程序不是主動(dòng)的調(diào)用某個(gè) API 完成處理,而是恰恰相反,Reactor逆置了事件處理流程,應(yīng)用程序需要提供相應(yīng)的接口并注冊(cè)到 Reactor 上,如果相應(yīng)的時(shí)間發(fā)生,Reactor將主動(dòng)調(diào)用應(yīng)用程序注冊(cè)的接口,這些接口又稱(chēng)為“回調(diào)函數(shù)”。
????????對(duì)于剛開(kāi)始接觸這個(gè)機(jī)制,個(gè)人感覺(jué)翻譯成“感應(yīng)器”可能會(huì)更好理解一點(diǎn),因?yàn)樽?cè)在Reactor上的函數(shù)就像感應(yīng)器一樣,只要有事件到達(dá),就會(huì)觸發(fā)它開(kāi)始工作。
????????Reactor 模式是編寫(xiě)高性能網(wǎng)絡(luò)服務(wù)器的必備技術(shù)之一。
2、Reactor模型的優(yōu)點(diǎn)
- 響應(yīng)快,不必為單個(gè)同步時(shí)間所阻塞,雖然 Reactor 本身依然是同步的;
- 編程相對(duì)簡(jiǎn)單,可以最大程度的避免復(fù)雜的多線(xiàn)程及同步問(wèn)題,并且避免了多線(xiàn)程/進(jìn)程的切換開(kāi)銷(xiāo);
- 可擴(kuò)展性強(qiáng),可以方便的通過(guò)增加 Reactor 實(shí)例個(gè)數(shù)來(lái)充分利用 CPU 資源;
-
可復(fù)用性高,reactor 框架本身與具體事件處理邏輯無(wú)關(guān),具有很高的復(fù)用性;
? ? ? ?Reactor 模型開(kāi)發(fā)效率上比起直接使用 IO 復(fù)用要高,它通常是單線(xiàn)程的,設(shè)計(jì)目標(biāo)是希望單線(xiàn)程使用一顆 CPU 的全部資源。? ? ? ? 優(yōu)點(diǎn)即每個(gè)事件處理中很多時(shí)候可以不考慮共享資源的互斥訪(fǎng)問(wèn)??墒侨秉c(diǎn)也是明顯的,現(xiàn)在的硬件發(fā)展,已經(jīng)不再遵循摩爾定律,CPU 的頻率受制于材料的限制不再有大的提升,而改為是從核數(shù)的增加上提升能力,當(dāng)程序需要使用多核資源時(shí),Reactor 模型就會(huì)悲劇 , 為什么呢?? ? ? ? 如果程序業(yè)務(wù)很簡(jiǎn)單,例如只是簡(jiǎn)單的訪(fǎng)問(wèn)一些提供了并發(fā)訪(fǎng)問(wèn)的服務(wù),就可以直接開(kāi)啟多個(gè)反應(yīng)堆,每個(gè)反應(yīng)堆對(duì)應(yīng)一顆 CPU 核心,這些反應(yīng)堆上跑的請(qǐng)求互不相關(guān),這是完全可以利用多核的。例如 Nginx 這樣的 http 靜態(tài)服務(wù)器。
3、通過(guò)對(duì)網(wǎng)絡(luò)編程(epoll)代碼的優(yōu)化,深入理解Reactor模型
1、epoll的普通版本,根據(jù)fd類(lèi)型(listen_fd和client_fd)分為兩大類(lèi)處理。
????????如果是listen_fd,調(diào)用accept處理連接請(qǐng)求;
????????如果是client_fd,調(diào)用recv或者send處理數(shù)據(jù)。
?????????代碼實(shí)現(xiàn):
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
int main(int argc, char* argv[])
{
if (argc < 2)
return -1;
int port = atoi(argv[1]); //字符串轉(zhuǎn)換為整型
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
return -1;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in)); //新申請(qǐng)的空間一定要置零
addr.sin_family = AF_INET;
addr.sin_port = htons(port); //轉(zhuǎn)換成網(wǎng)絡(luò)字節(jié)序
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0)
return -2;
if (listen(sockfd, 5) < 0)
return -3;
//epoll
int epfd = epoll_create(1); //創(chuàng)建epoll,相當(dāng)于紅黑樹(shù)的根節(jié)點(diǎn)
struct epoll_event ev, events[1024] = {0}; //events相當(dāng)于就緒隊(duì)列,一次性可以處理的集合
ev.events = EPOLLIN;
ev.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev); //將ev節(jié)點(diǎn)加入到epoll,此處的sockfd參數(shù)隨便添加沒(méi)有意義,需要操作系統(tǒng)索引和它有對(duì)應(yīng)的句柄
while (1)
{
int nready = epoll_wait(epfd, events, 1024, -1); //第四個(gè)參數(shù)-1表示一直等待,有事件才返回
if (nready < 1) //沒(méi)有事件觸發(fā),nready代表觸發(fā)事件的個(gè)數(shù)
break;
int i = 0;
for (i = 0; i < nready; i++) //epoll_wait帶出的就緒fd包括兩大類(lèi):1、處理連接的listen_fd,2、處理數(shù)據(jù)的send和recv
{
if (events[i].data.fd == sockfd) //如果是listenfd,就將它加入到epoll
{
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(sockfd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd <= 0)
continue;
char str[INET_ADDRSTRLEN] = {0};
printf("recv from IP = %s ,at Port= %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)), ntohs(client_addr.sin_port));
ev.events = EPOLLIN | EPOLLET; //epoll默認(rèn)是LT模式
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev);
}
else //fd進(jìn)行讀寫(xiě)操作
{
//對(duì)fd的讀寫(xiě)操作沒(méi)有分開(kāi)
int client_fd = events[i].data.fd;
char buf[1024] = {0};
int ret = recv(client_fd, buf, 1024, 0);
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//
}
else
{
//
}
printf("ret < 0,斷開(kāi)連接:%d\n", client_fd);
close(client_fd);
ev.events = EPOLLIN;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev);
}
else if (ret == 0) //接收到了客戶(hù)端發(fā)來(lái)的斷開(kāi)連接請(qǐng)求FIN后,沒(méi)有及時(shí)調(diào)用close函數(shù),進(jìn)入了CLOSE _WAIT狀態(tài)
{
printf("ret = 0,斷開(kāi)連接:%d\n", client_fd);
close(client_fd);
ev.events = EPOLLIN;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev); //close關(guān)閉連接后要將它既是從epoll中刪除
}
else
{
printf("Recv: %s, %d Bytes\n", buf, ret);
}
//區(qū)分fd的讀寫(xiě)操作,即recv和send
if (events[i].events & EPOLLIN)
{
int client_fd = events[i].data.fd;
char buf[1024] = {0};
int ret = recv(client_fd, buf, 1024, 0);
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//...
}
else
{
//...
}
printf("ret < 0,斷開(kāi)連接:%d\n", client_fd);
close(client_fd);
ev.events = EPOLLIN;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev);
}
else if (ret == 0) //接收到了客戶(hù)端發(fā)來(lái)的斷開(kāi)連接請(qǐng)求FIN后,沒(méi)有及時(shí)調(diào)用close函數(shù),進(jìn)入了CLOSE _WAIT狀態(tài)
{
printf("ret = 0,斷開(kāi)連接:%d\n", client_fd);
close(client_fd);
ev.events = EPOLLIN;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev); //close關(guān)閉連接后要將它既是從epoll中刪除
}
else
{
printf("Recv: %s, %d Bytes\n", buf, ret);
}
}
if (events[i].events & EPOLLOUT) //為什么需要判斷EPOLLOUT,而不是直接else?因?yàn)橐粋€(gè)fd有可能同時(shí)存在可讀和可寫(xiě)事件的
{
int client_fd = events[i].data.fd;
char buf[1024] = {0};
send(client_fd, buf, sizeof(buf), 0);
}
}
}
}
return 0;
}
?
2、epoll的優(yōu)化版本,根據(jù)事件類(lèi)型(讀和寫(xiě))分為兩大類(lèi)處理。
?????????代碼實(shí)現(xiàn):
for (i = 0; i < nready; i++) //epoll_wait帶出的就緒fd包括兩大類(lèi):1、處理連接的listen_fd,2、處理數(shù)據(jù)的send和recv
{
//區(qū)分fd的讀寫(xiě)操作
if (events[i].events & EPOLLIN)
{
if (events[i].data.fd == sockfd) //如果是listenfd,就將它加入到epoll
{
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(sockfd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd <= 0)
continue;
char str[INET_ADDRSTRLEN] = {0};
printf("recv from IP = %s ,at Port= %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)), ntohs(client_addr.sin_port));
ev.events = EPOLLIN | EPOLLET; //epoll默認(rèn)是LT模式
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev);
}
else
{
int client_fd = events[i].data.fd;
char buf[1024] = {0};
int ret = recv(client_fd, buf, 1024, 0);
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//...
}
else
{
//...
}
printf("ret < 0,斷開(kāi)連接:%d\n", client_fd);
close(client_fd);
ev.events = EPOLLIN;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev);
}
else if (ret == 0) //接收到了客戶(hù)端發(fā)來(lái)的斷開(kāi)連接請(qǐng)求FIN后,沒(méi)有及時(shí)調(diào)用close函數(shù),進(jìn)入了CLOSE _WAIT狀態(tài)
{
printf("ret = 0,斷開(kāi)連接:%d\n", client_fd);
close(client_fd);
ev.events = EPOLLIN;
ev.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev); //close關(guān)閉連接后要將它既是從epoll中刪除
}
else
{
printf("Recv: %s, %d Bytes\n", buf, ret);
}
}
}
//為什么需要判斷EPOLLOUT,而不是直接else?因?yàn)橐粋€(gè)fd有可能同時(shí)存在可讀和可寫(xiě)事件的
if (events[i].events & EPOLLOUT)
{
int client_fd = events[i].data.fd;
char buf[1024] = {0};
send(client_fd, buf, sizeof(buf), 0);
}
}
?
3、epoll的Reactor模式,?epoll由以前的對(duì)網(wǎng)絡(luò)io(fd)進(jìn)行管理,轉(zhuǎn)變成對(duì)events事件進(jìn)行管理。
?????????代碼實(shí)現(xiàn):
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
//每個(gè)fd所對(duì)應(yīng)的信息
struct sockitem
{
int sockfd;
int (*callback)(int fd, int events, void*arg);
char sendbuf[1024];
char recvbuf[1024];
};
//每個(gè)epoll所對(duì)應(yīng)的信息
struct epollitem
{
int epfd;
struct epoll_event events[1024]; //events相當(dāng)于就緒隊(duì)列,一次性可以處理的集合
};
struct epollitem *eventloop = NULL;
int recv_cb(int fd, int events, void*arg);
int send_cb(int fd, int events, void*arg);
int accept_cb(int fd, int events, void*arg)
{
printf("---accept_cb(int fd, int events, void*arg)---\n");
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd <= 0)
return -1;
char str[INET_ADDRSTRLEN] = {0};
printf("recv from IP = %s ,at Port= %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)), ntohs(client_addr.sin_port));
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; //epoll默認(rèn)是LT模式
struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = client_fd;
si->callback = recv_cb;
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, client_fd, &ev);
return client_fd;
}
int recv_cb(int fd, int events, void*arg)
{
printf("---recv_cb(int fd, int events, void*arg)---\n");
struct epoll_event ev;
struct sockitem *sit = (struct sockitem*)arg;
int ret = recv(fd, sit->recvbuf, 1024, 0);
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//...
}
else
{
//...
}
printf("ret < 0,斷開(kāi)連接:%d\n", fd);
ev.events = EPOLLIN;
epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev); //close關(guān)閉連接后要將它既是從epoll中刪除
close(fd);
free(sit); //連接關(guān)閉后釋放內(nèi)存
}
else if (ret == 0) //接收到了客戶(hù)端發(fā)來(lái)的斷開(kāi)連接請(qǐng)求FIN后,沒(méi)有及時(shí)調(diào)用close函數(shù),進(jìn)入了CLOSE _WAIT狀態(tài)
{
printf("ret = 0,斷開(kāi)連接:%d\n", fd);
ev.events = EPOLLIN;
epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
close(fd);
free(sit);
}
else
{
printf("Recv from recvbuf: %s, %d Bytes\n", sit->recvbuf, ret);
ev.events = EPOLLIN | EPOLLOUT; //
sit->sockfd = fd;
sit->callback = send_cb;
ev.data.ptr = sit;
epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
}
return ret;
}
int send_cb(int fd, int events, void*arg)
{
struct epoll_event ev;
struct sockitem *sit = (struct sockitem*)arg;
strncpy(sit->sendbuf, sit->recvbuf, sizeof(sit->recvbuf) + 1);
send(fd, sit->sendbuf, sizeof(sit->recvbuf) + 1, 0);
ev.events = EPOLLIN | EPOLLET; //
sit->sockfd = fd;
sit->callback = recv_cb;
ev.data.ptr = sit;
epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
return fd;
}
int main(int argc, char* argv[])
{
if (argc < 2)
return -1;
int port = atoi(argv[1]); //字符串轉(zhuǎn)換為整型
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
return -1;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in)); //新申請(qǐng)的空間一定要置零
addr.sin_family = AF_INET;
addr.sin_port = htons(port); //轉(zhuǎn)換成網(wǎng)絡(luò)字節(jié)序
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0)
return -2;
if (listen(sockfd, 5) < 0)
return -3;
//epoll
eventloop = (struct epollitem *)malloc(sizeof(struct epollitem));
eventloop->epfd = epoll_create(1); //創(chuàng)建epoll,相當(dāng)于紅黑樹(shù)的根節(jié)點(diǎn)
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = sockfd;
si->callback = accept_cb;
ev.data.ptr = si; //將fd和對(duì)應(yīng)的回調(diào)函數(shù)綁定一起帶進(jìn)epoll
epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, sockfd, &ev); //將ev節(jié)點(diǎn)加入到epoll,此處的sockfd參數(shù)隨便添加沒(méi)有意義,需要操作系統(tǒng)索引和它有對(duì)應(yīng)的句柄
while (1)
{
int nready = epoll_wait(eventloop->epfd, eventloop->events, 1024, -1); //第四個(gè)參數(shù)-1表示一直等待,有事件才返回
if (nready < 1) //沒(méi)有事件觸發(fā),nready代表觸發(fā)事件的個(gè)數(shù)
break;
int i = 0;
for (i = 0; i < nready; i++)
{
//區(qū)分fd的讀寫(xiě)操作
if (eventloop->events[i].events & EPOLLIN)
{
struct sockitem *sit = (struct sockitem*)eventloop->events[i].data.ptr;
sit->callback(sit->sockfd, eventloop->events[i].events, sit); //不用區(qū)分listen_fd和recv_fd,相應(yīng)的fd都會(huì)調(diào)用他們所對(duì)應(yīng)的callback
}
//為什么需要判斷EPOLLOUT,而不是直接else?因?yàn)橐粋€(gè)fd有可能同時(shí)存在可讀和可寫(xiě)事件的
if (eventloop->events[i].events & EPOLLOUT)
{
struct sockitem *sit = (struct sockitem*)eventloop->events[i].data.ptr;
sit->callback(sit->sockfd, eventloop->events[i].events, sit);
}
}
}
return 0;
}
4、Reactor模型的應(yīng)用?
????????1、單線(xiàn)程模式的Reactor,參考libevent、redis;
????????2、多線(xiàn)程模式的Reactor,參考memcached;文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-687397.html
????????3、多進(jìn)程模式的Reactor,參考nginx。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-687397.html
到了這里,關(guān)于深入理解Reactor模型的原理與應(yīng)用的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!