?
文章目錄
一、項(xiàng)目簡(jiǎn)介
二、項(xiàng)目整體認(rèn)識(shí)
2、1?HTTP服務(wù)器
2、2 Reactor模型
三、預(yù)備知識(shí)
3、1 C++11 中的 bind
3、2?簡(jiǎn)單的秒級(jí)定時(shí)任務(wù)實(shí)現(xiàn)
3、3?正則庫(kù)的簡(jiǎn)單使用
3、4?通用類(lèi)型any類(lèi)型的實(shí)現(xiàn)
四、服務(wù)器功能模塊劃分與實(shí)現(xiàn)
4、1 Buffer模塊
4、2 Socket模塊
4、3 Channel模塊
4、4 Poller模塊
4、5 Eventloop模塊
4、5、1 時(shí)間輪思想
4、5、2?TimerWheel定時(shí)器模塊整合
4、5、3 Channel 與 EventLoop整合
4、5、3 時(shí)間輪與EventLoop整合
4、6 Connection模塊
4、7 Acceptor模塊
4、8 LoopThread模塊
4、9 LoopThreadPool模塊
4、10 TcpServer模塊
4、11 測(cè)試代碼
五、HTTP協(xié)議支持實(shí)現(xiàn)
5、1 Util模塊
5、2 HttpRequest模塊
5、3 HttpResponse模塊
5、4 HttpContext模塊
5、5 HttpServer模塊
六、對(duì)服務(wù)器進(jìn)行測(cè)試
6、1 長(zhǎng)連接測(cè)試
6、2 不完整報(bào)文請(qǐng)求
6、3?業(yè)務(wù)處理超時(shí)測(cè)試
6、4?一次發(fā)送多條數(shù)據(jù)測(cè)試
6、5?大文件傳輸測(cè)試
6、6 性能測(cè)試
???♂??作者:@Ggggggtm????♂?
???專(zhuān)欄:實(shí)戰(zhàn)項(xiàng)目??
???標(biāo)題: 仿muduo庫(kù)實(shí)現(xiàn)one thread one loop式并發(fā)服務(wù)器 ??
????寄語(yǔ):與其忙著訴苦,不如低頭趕路,奮路前行,終將遇到一番好風(fēng)景???
一、項(xiàng)目簡(jiǎn)介
? 該項(xiàng)目目標(biāo)是實(shí)現(xiàn)一個(gè)高并發(fā)的服務(wù)器。但并不是自己完全實(shí)現(xiàn)一個(gè),而是仿照現(xiàn)有成熟的技術(shù)進(jìn)行模擬實(shí)現(xiàn)。
? 一些必備知識(shí):線程、網(wǎng)絡(luò)套接字編程、多路轉(zhuǎn)接技術(shù)(epoll),另外還有一些小的知識(shí),在本篇文章中會(huì)提前講解。
? 本項(xiàng)目主要分為多個(gè)模塊來(lái)進(jìn)行講解,實(shí)際上就是一個(gè)個(gè)小的組件。通過(guò)這些組件,我們可以很快的搭建起來(lái)一個(gè)高并發(fā)式的服務(wù)器。
二、項(xiàng)目整體認(rèn)識(shí)
2、1?HTTP服務(wù)器
? 該項(xiàng)目組件內(nèi)提供的不同應(yīng)用層協(xié)議支持,由于應(yīng)用層協(xié)議有很多,我們就在項(xiàng)目中提供較為常見(jiàn)的HTTP協(xié)議組件支持。
? HTTP(Hyper Text Transfer Protocol),超?本傳輸協(xié)議是應(yīng)?層協(xié)議,是?種簡(jiǎn)單的請(qǐng)求-響應(yīng)協(xié)議(客戶端根據(jù)自己的需要向服務(wù)器發(fā)送請(qǐng)求,服務(wù)器針對(duì)請(qǐng)求提供服務(wù),完畢后通信結(jié)束)。
? 但是需要注意的是HTTP協(xié)議是?個(gè)運(yùn)行在TCP協(xié)議之上的應(yīng)用層協(xié)議,這?點(diǎn)本質(zhì)上是告訴我們,HTTP服務(wù)器其實(shí)就是個(gè)TCP服務(wù)器,只不過(guò)在應(yīng)用層基于HTTP協(xié)議格式進(jìn)行數(shù)據(jù)的組織和解析來(lái)明確客?端的請(qǐng)求并完成業(yè)務(wù)處理。? 因此實(shí)現(xiàn)HTTP服務(wù)器簡(jiǎn)單理解,只需要以下幾步即可:
- 搭建?個(gè)TCP服務(wù)器,接收客戶端請(qǐng)求。
- 以HTTP協(xié)議格式進(jìn)行解析請(qǐng)求數(shù)據(jù),明確客戶端目的。
- 明確客戶端請(qǐng)求目的后提供對(duì)應(yīng)服務(wù)。
- 將服務(wù)結(jié)果?HTTP協(xié)議格式進(jìn)行組織,發(fā)送給客戶端實(shí)現(xiàn)?個(gè)HTTP服務(wù)器很簡(jiǎn)單,但是實(shí)現(xiàn)?個(gè)高性能的服務(wù)器并不簡(jiǎn)單,這個(gè)單元中將講解基于Reactor模式的高性能服務(wù)器實(shí)現(xiàn)。
2、2 Reactor模型
? Reactor模式,是指通過(guò)?個(gè)或多個(gè)輸入同時(shí)傳遞給服務(wù)器進(jìn)行請(qǐng)求處理時(shí)的事件驅(qū)動(dòng)處理模式。
? 服務(wù)端程序處理傳入多路請(qǐng)求,并將它們同步分派給請(qǐng)求對(duì)應(yīng)的處理線程,Reactor 模式也叫Dispatcher模式。簡(jiǎn)單理解就是使用?I/O 多路復(fù)用 統(tǒng)?監(jiān)聽(tīng)事件(Reactor 模式就是基于IO多路復(fù)用構(gòu)建起來(lái)的),收到事件后分發(fā)給處理進(jìn)程或線程,是編寫(xiě)高性能網(wǎng)絡(luò)服務(wù)器的必備技術(shù)之?。
? 網(wǎng)絡(luò)模型演化過(guò)程中,將建立連接、IO等待/讀寫(xiě)以及事件轉(zhuǎn)發(fā)等操作分階段處理,然后可以對(duì)不同階段采用相應(yīng)的優(yōu)化策略來(lái)提高性能;也正是如此,Reactor 模型在不同階段都有相關(guān)的優(yōu)化策略,常見(jiàn)的有以下三種方式呈現(xiàn):
- 單Reactor單線程模型:?jiǎn)蜪/O多路復(fù)用+業(yè)務(wù)處理;
- 單Reactor多線程模型:?jiǎn)蜪/O多路復(fù)用+線程池;
- 多Reactor多線程模型:多I/O多路復(fù)用+線程池。
? ?下面我們來(lái)具體分析一下其優(yōu)缺點(diǎn)。
? ?單Reactor單線程:在單個(gè)線程中進(jìn)行事件監(jiān)控并處理。具體步驟如下:
- 通過(guò)IO多路復(fù)用模型進(jìn)行客戶端請(qǐng)求監(jiān)控。
- 觸發(fā)事件后,進(jìn)行事件處理。
- 如果是新建連接請(qǐng)求,則獲取新建連接,并添加至多路復(fù)用模型進(jìn)行事件監(jiān)控。如果是數(shù)據(jù)通信請(qǐng)求,則進(jìn)行對(duì)應(yīng)數(shù)據(jù)處理(接收數(shù)據(jù),處理數(shù)據(jù),發(fā)送響應(yīng))。
??
- 優(yōu)點(diǎn):所有操作均在同?線程中完成,思想流程較為簡(jiǎn)單,不涉及進(jìn)程/線程間通信及資源爭(zhēng)搶問(wèn)題。
- 缺點(diǎn):無(wú)法有效利用CPU多核資源,很容易達(dá)到性能瓶頸。
- 適用場(chǎng)景:適用于客戶端數(shù)量較少,且處理速度較為快速的場(chǎng)景。(處理較慢或活躍連接較多,會(huì)導(dǎo)致串行處理的情況下,后處理的連接長(zhǎng)時(shí)間無(wú)法得到響應(yīng))
? 單Reactor多線程:一個(gè)Reactor進(jìn)行時(shí)間監(jiān)控,由多個(gè)線程(線程池)來(lái)處理就緒事件。
- Reactor線程通過(guò)I/O多路復(fù)用模型進(jìn)行客戶端請(qǐng)求監(jiān)控;
- 觸發(fā)事件后,進(jìn)行事件處理
- 如果是新建連接請(qǐng)求,則獲取新建連接,并添加至多路復(fù)用模型進(jìn)行事件監(jiān)控。
- 如果是數(shù)據(jù)通信請(qǐng)求,則接收數(shù)據(jù)后分發(fā)給Worker線程池進(jìn)行業(yè)務(wù)處理。
- 工作線程處理完畢后,將響應(yīng)交給Reactor線程進(jìn)行數(shù)據(jù)響應(yīng)。
? 其優(yōu)缺點(diǎn)如下:
- 優(yōu)點(diǎn):充分利用CPU多核資源
- 缺點(diǎn):多線程間的數(shù)據(jù)共享訪問(wèn)控制較為復(fù)雜,單個(gè)Reactor 承擔(dān)所有事件的監(jiān)聽(tīng)和響應(yīng),在單線程中運(yùn)行,高并發(fā)場(chǎng)景下容易成為性能瓶頸。
? 多Reactor多線程:多I/O多路復(fù)用進(jìn)行時(shí)間監(jiān)控,同時(shí)使用線程池來(lái)對(duì)就緒時(shí)間進(jìn)行處理。
- 在主Reactor中處理新連接請(qǐng)求事件,有新連接到來(lái)則分發(fā)到子Reactor中監(jiān)控
- 在子Reactor中進(jìn)行客戶端通信監(jiān)控,有事件觸發(fā),則接收數(shù)據(jù)分發(fā)給Worker線程池
- Worker線程池分配獨(dú)立的線程進(jìn)行具體的業(yè)務(wù)處理
- 工作線程處理完畢后,將響應(yīng)交給子Reactor線程進(jìn)行數(shù)據(jù)響應(yīng)。
? 優(yōu)點(diǎn):充分利用CPU多核資源,主從Reactor各司其職。但是大家也要理解:執(zhí)行流并不是越多越好,因?yàn)閳?zhí)行流多了,反而會(huì)增加cpu切換調(diào)度的成本。
? 目標(biāo)定位-One Thread One Loop主從Reactor模型高并發(fā)服務(wù)器。
? 咱們要實(shí)現(xiàn)的是主從Reactor模型服務(wù)器,也就是主Reactor線程僅僅監(jiān)控監(jiān)聽(tīng)描述符,獲取新建連接,保證獲取新連接的高效性,提高服務(wù)器的并發(fā)性能。? 主Reactor獲取到新連接后分發(fā)給子Reactor進(jìn)行通信事件監(jiān)控。而子Reactor線程監(jiān)控各自的描述符的讀寫(xiě)事件進(jìn)行數(shù)據(jù)讀寫(xiě)以及業(yè)務(wù)處理。
? One Thread One Loop的思想就是把所有的操作都放到?個(gè)線程中進(jìn)行,?個(gè)線程對(duì)應(yīng)?個(gè)事件處理的循環(huán)。
? 當(dāng)前實(shí)現(xiàn)中,因?yàn)椴⒉淮_定組件使用者的使用意向,因此并不提供業(yè)務(wù)層工作線程池的實(shí)現(xiàn),只實(shí)現(xiàn)主從Reactor,而Worker工作線程池,可由組件庫(kù)的使用者的需要自行決定是否使用和實(shí)現(xiàn)。
? 對(duì)比上個(gè)模型,One Thread One Loop主從Reactor模型高并發(fā)服務(wù)器結(jié)構(gòu)圖如下:
三、預(yù)備知識(shí)
3、1 C++11 中的 bind
? bind也是一種函數(shù)包裝器,也叫做適配器。它可以接受一個(gè)可調(diào)用對(duì)象,以及函數(shù)的各項(xiàng)參數(shù),然后返回?個(gè)新的函數(shù)對(duì)象,但是這個(gè)函數(shù)對(duì)象的參數(shù)已經(jīng)被綁定為設(shè)置的參數(shù)。運(yùn)?的時(shí)候相當(dāng)于總是調(diào)用傳入固定參數(shù)的原函數(shù)。
? 調(diào)用bind的一般形式為:auto newCallable = bind(callable, arg_list);
? 解釋說(shuō)明:
- callable:需要包裝的可調(diào)用對(duì)象。
- newCallable:生成的新的可調(diào)用對(duì)象。
- arg_list:逗號(hào)分隔的參數(shù)列表,對(duì)應(yīng)給定的callable的參數(shù)。當(dāng)調(diào)用newCallable時(shí),newCallable會(huì)調(diào)用callable,并傳給它arg_list中的參數(shù)。
? arg_list中的參數(shù)可能包含形如_n的名字,其中n是一個(gè)整數(shù),這些參數(shù)是“占位符”,表示newCallable的參數(shù),它們占據(jù)了傳遞給newCallable的參數(shù)的“位置”。數(shù)值n表示生成的可調(diào)用對(duì)象中參數(shù)的位置,比如_1為newCallable的第一個(gè)參數(shù),_2為第二個(gè)參數(shù),以此類(lèi)推。
此外,除了用auto接收包裝后的可調(diào)用對(duì)象,也可以用function類(lèi)型指明返回值和形參類(lèi)型后接收包裝后的可調(diào)用對(duì)象。當(dāng)然,arg_list中的參數(shù)也可以綁定固定的值。下面我們來(lái)結(jié)合幾個(gè)例子理解一下。
? ?綁定固定值如下:
int Plus(int a, int b) { return a + b; } int main() { //綁定固定參數(shù) function<int()> func = bind(Plus, 10, 10); cout << func() << endl; return 0; }
? 在上述代碼中,func()相當(dāng)于調(diào)用了Plus(10,10)。因?yàn)槲覀兘壎斯潭ǖ膬蓚€(gè)參數(shù)值,所以直接調(diào)用即可。接下來(lái)我們?cè)倏匆幌率褂谜刮环M(jìn)行綁定。代碼如下:
int Plus(int a, int b) { return a + b; } int main() { //綁定固定參數(shù) function<int(int)> func = bind(Plus, placeholders::_1, 10); cout << func(2) << endl; //12 return 0; }
? 這里的 placeholders::_1 就是一個(gè)占位符,相當(dāng)于func中傳入的第一個(gè)參數(shù)。
? 上述的場(chǎng)景并不使用,一般情況我們會(huì)在對(duì)類(lèi)內(nèi)的成員函數(shù)進(jìn)行綁定,因?yàn)樵陬?lèi)外調(diào)用類(lèi)內(nèi)成員函數(shù)時(shí),由于類(lèi)內(nèi)的成員函數(shù)第一個(gè)參數(shù)是都是this指針,所以很不方便調(diào)用,于是我們可以綁定一個(gè)this指針,或者匿名對(duì)象都是可以的,這樣就可以正常的進(jìn)行調(diào)用了。結(jié)合如下例子理解一下:
class Sub { public: int sub(int a, int b) { return a - b; } }; int main() { //綁定固定參數(shù) function<int(int, int)> func = bind(&Sub::sub, Sub(), placeholders::_1, placeholders::_2); cout << func(1, 2) << endl; //-1 return 0; }
? 還有一種場(chǎng)景,bind函數(shù)有個(gè)好處就是,這種任務(wù)池在設(shè)計(jì)的時(shí)候,不?考慮都有哪些任務(wù)處理方式了,處理函數(shù)該如何設(shè)計(jì),有多少個(gè)什么樣的參數(shù),這些都不用考慮了,降低了代碼之間的耦合度。代碼如下:
#include <iostream> #include <string> #include <vector> #include <functional> void print(const std::string &str) { std::cout << str << std::endl; } int main() { using Functor = std::function<void()>; std::vector<Functor> task_pool; task_pool.push_back(std::bind(print, "你好")); task_pool.push_back(std::bind(print, "我是")); task_pool.push_back(std::bind(print, "Ggggggtm")); for (auto &functor : task_pool) { functor(); } return 0; }
? 在上述代碼中,print函數(shù)就是我們要執(zhí)行的任務(wù),當(dāng)然還可以有其他的函數(shù)。如果沒(méi)有bind,那么處理各種不同參數(shù)的函數(shù)是很麻煩的,而這里我們只需要一個(gè)bind函數(shù),可以將他們同意看成無(wú)參的函數(shù)。
3、2?簡(jiǎn)單的秒級(jí)定時(shí)任務(wù)實(shí)現(xiàn)
? 在當(dāng)前的?并發(fā)服務(wù)器中,我們不得不考慮?個(gè)問(wèn)題,那就是連接的超時(shí)關(guān)閉問(wèn)題。我們需要避免?個(gè)連接?時(shí)間不通信,但是也不關(guān)閉,空耗資源的情況。這時(shí)候我們就需要?個(gè)定時(shí)任務(wù),定時(shí)的將超時(shí)過(guò)期的連接進(jìn)?釋放。? Linux中給我們提供了定時(shí)器,代碼如下:#include <sys/timerfd.h> int timerfd_create(int clockid, int flags); //clockid : CLOCK_REALTIME - 系統(tǒng)實(shí)時(shí)時(shí)間,如果修改了系統(tǒng)時(shí)間就會(huì)出問(wèn)題; CLOCK_MONOTONIC - 從開(kāi)機(jī)到現(xiàn)在的時(shí)間是?種相對(duì)時(shí)間; flags : 0 - 默認(rèn)阻塞屬性 int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old); //fd : timerfd_create返回的?件描述符 // flags : 0 - // 相對(duì)時(shí)間, 1 - 絕對(duì)時(shí)間;默認(rèn)設(shè)置為0即可.new: ?于設(shè)置定時(shí)器的新超時(shí)時(shí)間 old: ?于接 收原來(lái)的超時(shí)時(shí)間 struct timespec { time_t tv_sec; /* Seconds */ long tv_nsec; /* Nanoseconds */ }; struct itimerspec { struct timespec it_interval; /* 第?次之后的超時(shí)間隔時(shí)間 */ struct timespec it_value; /* 第?次超時(shí)時(shí)間 */ }; // 定時(shí)器會(huì)在每次超時(shí)時(shí),?動(dòng)給fd中寫(xiě)?8字節(jié)的數(shù)據(jù),表?在上?次讀取數(shù)據(jù)到當(dāng)前讀取數(shù)據(jù)期間超 // 時(shí)了多少次。
? 下面我們來(lái)結(jié)合一個(gè)實(shí)際的例子來(lái)看一下。具體如下:
#include <iostream> #include <stdio.h> #include <errno.h> #include <sys/timerfd.h> #include <unistd.h> int main() { int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); if(timerfd < 0) { perror("timerfd_create error"); exit(2); } struct itimerspec itm; itm.it_value.tv_sec = 3; itm.it_value.tv_nsec = 0; itm.it_interval.tv_sec = 3; itm.it_interval.tv_nsec = 0; timerfd_settime(timerfd, 0, &itm, nullptr); while(true) { uint64_t tmp; int n = read(timerfd, &tmp, sizeof tmp); if(n < 0) { perror("read error"); exit(3); } std::cout << "超時(shí)了,距離上一次超時(shí): " << tmp << " 次數(shù)" << std::endl; } return 0; }
? 其實(shí)上述代碼我們就設(shè)置了一個(gè)每3秒鐘的定時(shí)器,也就是每個(gè)3秒鐘,都會(huì)出發(fā)一次,相當(dāng)于每個(gè)3秒鐘像文件中寫(xiě)入一次數(shù)據(jù)。運(yùn)行結(jié)果如下圖:
? 注意,后面我們會(huì)根據(jù)定時(shí)器實(shí)現(xiàn)一個(gè)時(shí)間輪來(lái)完成對(duì)超時(shí)任務(wù)的釋放銷(xiāo)毀。這里你可能還不理解超時(shí)任務(wù)的釋放銷(xiāo)毀,或許會(huì)詳細(xì)講解到。?
3、3?正則庫(kù)的簡(jiǎn)單使用
? 正則表達(dá)式(regular expression)描述了一種字符串匹配的模式(pattern),可以用來(lái)檢查一個(gè)串是否含有某種子串、將匹配的子串替換或者從某個(gè)串中取出符合某個(gè)條件的子串等。
??正則表達(dá)式的使用,可以使得HTTP請(qǐng)求的解析更加簡(jiǎn)單(這里指的時(shí)程序員的工作變得的簡(jiǎn)單,這并不代表處理效率會(huì)變高,實(shí)際上效率上是低于直接的字符串處理的),使我們實(shí)現(xiàn)的HTTP組件庫(kù)使用起來(lái)更加靈活。
? 本篇文章就不再過(guò)多對(duì)正則表達(dá)式的詳細(xì)使用進(jìn)行詳解,但是代碼中會(huì)有注釋?zhuān)欢男』锇榭梢匀∷阉飨嚓P(guān)文章進(jìn)行學(xué)習(xí)。實(shí)例代碼如下:
#include <regex> void req_line() { std::cout << "------------------first line start-----------------\n"; // std::string str = "GET /bitejiuyeke HTTP/1.1\r\n"; // std::string str = "GET /bitejiuyeke HTTP/1.1\n"; std::string str = "GET /bitejiuyeke?a=b&c=d HTTP/1.1\r\n"; // 匹配規(guī)則 std::regex re("(GET|HEAD|POST|PUT|DELETE) (([^?]+)(?:\\?(.*?))?) (HTTP/1\\.[01])(?:\r\n |\n)"); std::smatch matches; std::regex_match(str, matches, re); /*正則匹配獲取完畢之后matches中的存儲(chǔ)情況*/ /* matches[0] 整體?? GET /bitejiuyeke?a=b&c=d HTTP/1.1 matches[1] 請(qǐng)求?法 GET matches[2] 整體URL /bitejiuyeke?a=b&c=d matches[3] ?之前 /bitejiuyeke matches[4] 查詢字符串 a=b&c=d matches[5] 協(xié)議版本 HTTP/1.1 */ int i = 0; for (const auto &it : matches) { std::cout << i++ << ": "; std::cout << it << std::endl; } if (matches[4].length() > 0) { std::cout << "have param!\n"; } else { std::cout << "have not param!\n"; } std::cout << "------------------first line start-----------------\n"; return; } void method_match(const std::string str) { std::cout << "------------------method start-----------------\n"; std::regex re("(GET|HEAD|POST|PUT|DELETE) .*"); /* () 表?捕捉符合括號(hào)內(nèi)格式的數(shù)據(jù) * GET|HEAD|POST... |表?或,也就是匹配這?個(gè)字符串中的任意?個(gè) * .* 中.表?匹配除換?外的任意單字符, *表?匹配前邊的字符任意次; 合起來(lái)在這?就是 表?空格后匹配任意字符 * 最終合并起來(lái)表?匹配以GET或者POST或者PUT...?個(gè)字符串開(kāi)始,然后后邊有個(gè)空格的字 符串, 并在匹配成功后捕捉前邊的請(qǐng)求?法字符串 */ std::smatch matches; std::regex_match(str, matches, re); std::cout << matches[0] << std::endl; std::cout << matches[1] << std::endl; std::cout << "------------------method over------------------\n"; } void path_match(const std::string str) { // std::regex re("(([^?]+)(?:\\?(.*?))?)"); std::cout << "------------------path start------------------\n"; std::regex re("([^?]+).*"); /* * 最外層的() 表?捕捉提取括號(hào)內(nèi)指定格式的內(nèi)容 * ([^?]+) [^xyz] 負(fù)值匹配集合,指匹配?^之后的字符, ?如[^abc] 則plain就匹配到 plin字符 * +匹配前?的?表達(dá)式?次或多次 * 合并合并起來(lái)就是匹配??字符?次或多次 */ std::smatch matches; std::regex_match(str, matches, re); std::cout << matches[0] << std::endl; std::cout << matches[1] << std::endl; std::cout << "------------------path over------------------\n"; } void query_match(const std::string str) { std::cout << "------------------query start------------------\n"; std::regex re("(?:\\?(.*?))? .*"); /* * (?:\\?(.*?))? 最后的?表?匹配前邊的表達(dá)式0次或1次,因?yàn)橛械恼?qǐng)求可能沒(méi)有查詢 字符串 * (?:\\?(.*?)) (?:pattern)表?匹配pattern但是不獲取匹配結(jié)果 * \\?(.*?) \\?表?原始的?字符,這?表?以?字符作為起始 * .表?\n之外任意單字符, *表?匹配前邊的字符0次或多次, ?跟在*或+之后表?懶惰模式, 也就是說(shuō)以?開(kāi)始的字符串就只匹配這?次就?, 后邊還有以?開(kāi)始的同格式字符串也不不會(huì)匹配 () 表?捕捉獲取符合內(nèi)部格式的數(shù)據(jù) * 合并起來(lái)表?的就是,匹配以?開(kāi)始的字符串,但是字符串整體不要, * 只捕捉獲取?之后的字符串,且只匹配?次,就算后邊還有以?開(kāi)始的同格式字符串也不不會(huì)匹 配 */ std::smatch matches; std::regex_match(str, matches, re); std::cout << matches[0] << std::endl; std::cout << matches[1] << std::endl; std::cout << "------------------query over------------------\n"; } void version_mathch(const std::string str) { std::cout << "------------------version start------------------\n"; std::regex re("(HTTP/1\\.[01])(?:\r\n|\n)"); /* * (HTTP/1\\.[01]) 外層的括號(hào)表?捕捉字符串 * HTTP/1 表?以HTTP/1開(kāi)始的字符串 * \\. 表?匹配 . 原始字符 * [01] 表?匹配0字符或者1字符 * (?:\r\n|\n) 表?匹配?個(gè)\r\n或者\(yùn)n字符,但是并不捕捉這個(gè)內(nèi)容 * 合并起來(lái)就是匹配以HTTP/1.開(kāi)始,后邊跟了?個(gè)0或1的字符,且最終以\n或者\(yùn)r\n作為結(jié) 尾的字符串 */ std::smatch matches; std::regex_match(str, matches, re); std::cout << matches[0] << std::endl; std::cout << matches[1] << std::endl; std::cout << "------------------version over------------------\n"; }
3、4?通用類(lèi)型any類(lèi)型的實(shí)現(xiàn)
? ?所謂通用類(lèi)型,就是可以存儲(chǔ)任意類(lèi)型。我們第一時(shí)間可能想到通過(guò)模板來(lái)實(shí)現(xiàn),代碼如下:
template<class T> class Any { T _any; };
? 但上述并不是我們想要的。但是我們?cè)诙xAny對(duì)象時(shí),必須指定參數(shù)。使用模板并不是我們想要的,我們想要的是如下:
/* template<class T> class Any { T _any; }; Any<int> a; a = 10; */ // 我們實(shí)際上想要的 Any a; a = 10; a = "Ggggggtm";
? 所以使用模板是肯定不行的。那我們就想到了類(lèi)內(nèi)再嵌套一個(gè)類(lèi),這樣行不行呢?
class Any { private: template<class T> class placeholder { T _val; }; placeholder<T> _content; };
? 這樣好像也不太行,因?yàn)槲覀冊(cè)趯?shí)例化Any類(lèi)內(nèi)中的placeholder對(duì)象時(shí),也必須指定類(lèi)型。那么有沒(méi)有什么很好的辦法,在實(shí)例化Any類(lèi)中的成員變量對(duì)象時(shí),不用指定其類(lèi)型還能很好的存儲(chǔ)任意類(lèi)型呢?這里就可以使用多態(tài)的方法。思路是:
- 利用多態(tài)的特點(diǎn),父類(lèi)對(duì)象指向子類(lèi)對(duì)象,也可以安全的訪問(wèn)子類(lèi)對(duì)象中的成員;
- 子類(lèi)使用模板,來(lái)存儲(chǔ)任意類(lèi)型;
- Any類(lèi)中存儲(chǔ)父類(lèi)對(duì)象指針,來(lái)調(diào)用子類(lèi)成員。
- 當(dāng)我們存儲(chǔ)任意類(lèi)型時(shí),new一個(gè)子類(lèi)對(duì)象來(lái)保存數(shù)據(jù), 然后用子類(lèi)對(duì)象來(lái)初始化Any類(lèi)中的所保存的父類(lèi)(holder)對(duì)象指針即可。
? 大體的思路代碼如下:
class Any { private: class holder { //...... }; template<class T> class placeholder : public holder { //..... T _val; }; holder* _content; };
? 整體的思路有了,下面我們直接給出實(shí)現(xiàn)代碼,其中詳細(xì)細(xì)節(jié)就不再過(guò)多解釋。代碼如下:
class Any { public: Any() :_content(nullptr) {} template<class T> Any(const T& val) :_content(new placeholder<T>(val)) {} Any(const Any& other) :_content(other._content ? other._content->clone() : nullptr) {} ~Any() { delete _content; } template<class T> T* get() { assert(typeid(T) == _content->type()); return &(((placeholder<T>*)_content)->_val); } Any& Swap(Any& other) { std::swap(_content, other._content); return *this; } template<class T> Any& operator=(const T& val) { Any(val).Swap(*this); return *this; } Any& operator=(const Any& other) { Any(other).Swap(*this); return *this; } private: class holder { public: virtual ~holder() {} virtual const std::type_info& type() = 0; virtual holder* clone() = 0; }; template<class T> class placeholder : public holder { public: placeholder(const T& val) :_val(val) {} virtual const std::type_info& type() { return typeid(T); } virtual holder* clone() { return new placeholder(_val); } public: T _val; }; holder *_content; };
四、服務(wù)器功能模塊劃分與實(shí)現(xiàn)
? 實(shí)現(xiàn)一個(gè)Reactor模式的服務(wù)器,首先肯定需要進(jìn)行網(wǎng)絡(luò)套接字編程。Reactor模式就是基于多路轉(zhuǎn)接技術(shù)繼續(xù)進(jìn)行實(shí)現(xiàn)的,那么我們肯定需要對(duì)IO事件進(jìn)行監(jiān)控,然后對(duì)就緒的IO事件進(jìn)行處理。怎么判斷接收到的數(shù)據(jù)是否是一份完整的數(shù)據(jù)呢?所以在這里我們還要進(jìn)行協(xié)議定制。當(dāng)然,我們用的就是HTTP協(xié)議模式進(jìn)行傳輸數(shù)據(jù)。那么不夠一份完整的報(bào)文時(shí),我們需要將接收到的數(shù)據(jù)暫時(shí)保存起來(lái),那么肯定還需要定義一個(gè)接受和發(fā)送緩沖區(qū)。同時(shí)我們這個(gè)所實(shí)現(xiàn)的服務(wù)器當(dāng)中,還添加了對(duì)不活躍鏈接的銷(xiāo)毀,在后面我們也會(huì)詳細(xì)講到。
4、1 Buffer模塊
? Buffer模塊是一個(gè)緩沖區(qū)模塊,用于實(shí)現(xiàn)通信中用戶態(tài)的接收緩沖區(qū)和發(fā)送緩沖區(qū)功能。Buffer模塊主要就是用于當(dāng)我們接收到一個(gè)不完整的報(bào)文時(shí),需要將該報(bào)文暫時(shí)保存起來(lái)。同時(shí),我們?cè)趯?duì)于客戶端響應(yīng)的數(shù)據(jù),應(yīng)該是在套接字可寫(xiě)的情況下進(jìn)行發(fā)送,所以需要把數(shù)據(jù)放到暫時(shí)放到Buffer 的發(fā)送緩沖區(qū)當(dāng)中。
? 對(duì)于緩沖區(qū),我們只需要一段線性的空間來(lái)保存即可。那就可以直接用vector即可。我們實(shí)現(xiàn)的功能大概如下:
? 寫(xiě)入位置:
- 當(dāng)前寫(xiě)入位置指向哪里,從哪里開(kāi)始寫(xiě)入
- 如果后續(xù)剩余空間不夠了!
- 考慮整體緩沖區(qū)空閑空間是否足夠!(因?yàn)樽x位置也會(huì)向后偏移,前后有可能有空閑空間)
- 緩沖區(qū)空閑空間足夠:將數(shù)據(jù)移動(dòng)到起始位置
- 緩沖區(qū)空閑空間不夠:擴(kuò)容,從當(dāng)前寫(xiě)位置開(kāi)始擴(kuò)容足夠大小!
- 數(shù)據(jù)一旦寫(xiě)入成功,當(dāng)前寫(xiě)位置,向后偏移!
? 讀取數(shù)據(jù):
- 當(dāng)前的讀取位置指向哪里,就從哪里開(kāi)始讀取,前提是有數(shù)據(jù)可讀
- 可讀數(shù)據(jù)大?。寒?dāng)前寫(xiě)入位置,減去當(dāng)前讀取位置!
? 整體實(shí)現(xiàn)相對(duì)來(lái)說(shuō)較為簡(jiǎn)單,這里我們就直接給出代碼,就不再做過(guò)多解釋。
#include <ctime> #include <cstring> #include <iostream> #include <vector> #include <cassert> #include <string> using namespace std; #define BUFFER_SIZE 1024 class Buffer { private: std::vector<char> _buffer; // 使用vector進(jìn)行內(nèi)存空間管理 uint64_t _read_idx; // 讀偏移 uint64_t _write_idx; // 寫(xiě)偏移 public: Buffer():_read_idx(0),_write_idx(0),_buffer(BUFFER_SIZE) {} char* begin() {return &*_buffer.begin();} // 獲取當(dāng)前寫(xiě)入起始地址 char *writePosition() { return begin() + _write_idx;} // 獲取當(dāng)前讀取起始地址 char *readPosition() { return begin() + _read_idx; } // 獲取緩沖區(qū)末尾空間大小 —— 寫(xiě)偏移之后的空閑空間,總體大小減去寫(xiě)偏移 uint64_t tailIdleSize() {return _buffer.size() - _write_idx; } // 獲取緩沖區(qū)起始空間大小 —— 讀偏移之前的空閑空間 uint64_t handIdleSize() {return _read_idx ;} // 獲取可讀空間大小 = 寫(xiě)偏移 - 讀偏移 uint64_t readAbleSize() {return _write_idx - _read_idx ;} // 將讀偏移向后移動(dòng) void moveReadOffset(uint64_t len) { // 向后移動(dòng)大小必須小于可讀數(shù)據(jù)大小 assert(len <= readAbleSize()); _read_idx += len; } // 將寫(xiě)偏移向后移動(dòng) void moveWriteOffset(uint64_t len) { assert(len <= tailIdleSize()); _write_idx += len; } void ensureWriteSpace(uint64_t len) { // 確保可寫(xiě)空間足夠 (整體空間夠了就移動(dòng)數(shù)據(jù),否則就擴(kuò)容?。? if (tailIdleSize() >= len) return; // 不夠的話 ,判斷加上起始位置夠不夠,夠了將數(shù)據(jù)移動(dòng)到起始位置 if (len <= tailIdleSize() + handIdleSize()) { uint64_t rsz = readAbleSize(); //幫當(dāng)前數(shù)據(jù)大小先保存起來(lái) std::copy(readPosition(),readPosition() + rsz,begin()); // 把可讀數(shù)據(jù)拷貝到起始位置 _read_idx = 0; // 讀歸為0 _write_idx = rsz; // 可讀數(shù)據(jù)大小是寫(xiě)的偏移量! } else { // 總體空間不夠!需要擴(kuò)容,不移動(dòng)數(shù)據(jù),直接給寫(xiě)偏移之后擴(kuò)容足夠空間即可! _buffer.resize(_write_idx + len); } } // 寫(xiě)入數(shù)據(jù) void Write(const void *data,uint64_t len) { ensureWriteSpace(len); const char *d = (const char*) data; std::copy(d,d + len,writePosition()); } void WriteAndPush(void* data,uint64_t len) { Write(data,len); moveWriteOffset(len); } void WriteStringAndPush(const std::string &data) { writeString(data); moveWriteOffset(data.size()); } void writeString(const std::string &data) { return Write(data.c_str(),data.size()); } void writeBuffer(Buffer &data) { return Write(data.readPosition(),data.readAbleSize()); } void writeBufferAndPush(Buffer &data) { writeBuffer(data); moveWriteOffset(data.readAbleSize()); } std::string readAsString (uint64_t len) { assert(len <= readAbleSize()); std::string str; str.resize(len); Read(&str[0],len); return str; } void Read(void *buf,uint64_t len) { // 讀取數(shù)據(jù) 1. 保證足夠的空間 2.拷貝數(shù)據(jù)進(jìn)去 // 要求獲取的大小必須小于可讀數(shù)據(jù)大?。? assert(len <= readAbleSize()); std::copy(readPosition(),readPosition() + len,(char*)buf); } void readAndPop(void *buf,uint64_t len) { Read(buf,len); moveReadOffset(len); } // 逐步調(diào)試?。。。。? std::string ReadAsStringAndPop(uint64_t len) { assert(len <= readAbleSize()); std::string str = readAsString(len); moveReadOffset(len); return str; } char* FindCRLF() { char *res = (char*)memchr(readPosition(),'\n',readAbleSize()); return res; } // 通常獲取一行數(shù)據(jù),這種情況針對(duì)是: std::string getLine() { char* pos = FindCRLF(); if (pos == NULL) { return ""; } // +1 為了把換行數(shù)據(jù)取出來(lái)! return readAsString(pos - readPosition() + 1); } std::string getLineAndPop() { std::string str = getLine(); moveReadOffset(str.size()); return str; } void Clear() { // 清空緩沖區(qū)!clear // 只需要將偏移量歸0即可! _read_idx = 0; _write_idx = 0; } };
4、2 Socket模塊
? 我們?cè)诰帉?xiě)服務(wù)器時(shí),少不了的肯定是需要Socket套接字編程的。Socket模塊就是對(duì)網(wǎng)絡(luò)套接字編程進(jìn)行一個(gè)封裝,方便我們后面直接進(jìn)行相關(guān)操作。主要功能如下:
- 創(chuàng)建套接字(socket);
- 綁定地址信息(bind);
- 開(kāi)始監(jiān)聽(tīng)(listen);
- 向服務(wù)器發(fā)起連接(connect);
- 獲取新連接(accept);
- 接受數(shù)據(jù)(recv);
- 發(fā)送數(shù)據(jù)(send);
- 關(guān)閉套接字(close);
- 創(chuàng)建一個(gè)監(jiān)聽(tīng)鏈接;
- 創(chuàng)建一個(gè)客戶端連接;
- 開(kāi)啟地址和端口重用;
- 設(shè)置套接字為非阻塞。
? 這里對(duì)簡(jiǎn)單的一些網(wǎng)絡(luò)套接字接口就不再過(guò)多解釋?zhuān)瑢?duì)上述功能的后四點(diǎn)進(jìn)行簡(jiǎn)單解釋。我們先來(lái)看一下該模塊的代碼實(shí)現(xiàn):
#define MAX_LISTEN 1024 class Socket { public: Socket() : _sockfd(-1) {} Socket(int fd) : _sockfd(fd) {} ~Socket() { Close(); } int Fd() { return _sockfd; } bool Create() { _sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (_sockfd < 0) { ERR_LOG("create socket failed!"); return false; } return true; } bool Bind(const std::string ip, uint16_t port) { struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); socklen_t len = sizeof(struct sockaddr_in); int ret = bind(_sockfd, (struct sockaddr *)&addr, len); if (ret < 0) { ERR_LOG("bind sockfd failed!"); return false; } return true; } bool Listen(int backlog = MAX_LISTEN) { int ret = listen(_sockfd, backlog); if (ret < 0) { ERR_LOG("listen sockfd failed!"); return false; } return true; } // 向服務(wù)器發(fā)起連接 bool Connect(const std::string ip, uint16_t port) { struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); socklen_t len = sizeof(struct sockaddr_in); int ret = connect(_sockfd, (struct sockaddr *)&addr, len); if (ret < 0) { ERR_LOG("connect server failed!"); return false; } return true; } int Accept() { int newfd = accept(_sockfd, nullptr, nullptr); if (newfd < 0) { ERR_LOG("accept socker failed"); return -1; } return newfd; } ssize_t Recv(void *buf, size_t len, int flag = 0) { ssize_t ret = recv(_sockfd, buf, len, flag); if (ret <= 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return 0; ERR_LOG("recv msg failed!"); return -1; } return ret; } ssize_t NonBlockRecv(void *buf, size_t len) { return Recv(buf, len, MSG_DONTWAIT); } ssize_t Send(const void *buf, size_t len, int flag = 0) { ssize_t ret = send(_sockfd, buf, len, flag); if (ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return 0; ERR_LOG("send msg failed!"); return -1; } return ret; } ssize_t NonBlockSend(const void *buf, size_t len) { return Send(buf, len, MSG_DONTWAIT); } void Close() { if (_sockfd) { close(_sockfd); _sockfd = -1; } } // 創(chuàng)建一個(gè)服務(wù)器端連接 bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) { if (Create() == false) return false; if (block_flag) NonBlock(); if (Bind(ip, port) == false) return false; if (Listen() == false) return false; ReuseAddress(); return true; } // 創(chuàng)建一個(gè)客戶端連接 bool CreateClient(uint16_t port, const std::string &ip) { if (Create() == false) return false; if (Connect(ip, port) == false) return false; return true; } // 設(shè)置套接字選項(xiàng) —— 開(kāi)啟地址端口重用 void ReuseAddress() { int val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(val)); val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(val)); } // 設(shè)置套接字為非阻塞 void NonBlock() { int flag = fcntl(_sockfd, F_GETFL, 0); fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK); } private: int _sockfd; };
? 我們知道在Tcp通信當(dāng)中,建立連接會(huì)有三次握手,斷開(kāi)連接會(huì)有四次揮手。而主動(dòng)斷開(kāi)鏈接的一方在進(jìn)行第四次揮手的時(shí)候會(huì)變成TIME_WAIT狀態(tài),也就四需要等上一段時(shí)間該鏈接才算斷開(kāi)釋放。這也就意味著主動(dòng)斷開(kāi)連接的一方并不能很快的重新建立連接。為了解決這種情況,可以通過(guò)setsockopt函數(shù)進(jìn)行設(shè)置套接字選項(xiàng),開(kāi)啟地址和端口重用。具體封裝后的代碼如下:
void ReuseAddress() { int val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(val)); val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(val)); }
? 我們知道,調(diào)用read讀取數(shù)據(jù)的時(shí)候,如果底層數(shù)據(jù)不就緒,默認(rèn)情況下是阻塞的。在我們實(shí)現(xiàn)的服務(wù)器時(shí),并不像讓其阻塞。如果在讀取數(shù)據(jù)時(shí)阻塞了,就會(huì)導(dǎo)致其他的任務(wù)得不到很好的執(zhí)行。所以我們還需要一個(gè)對(duì)套接字設(shè)置非阻塞的功能。封裝后的代碼如下:
void NonBlock() { int flag = fcntl(_sockfd, F_GETFL, 0); fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK); }
? 需要注意的是,當(dāng)read函數(shù)以非阻塞方式讀取數(shù)據(jù)時(shí),如果底層數(shù)據(jù)不就緒,那么read函數(shù)就會(huì)立即返回,但當(dāng)?shù)讓訑?shù)據(jù)不就緒時(shí),read函數(shù)是以出錯(cuò)的形式返回的,此時(shí)的錯(cuò)誤碼會(huì)被設(shè)置為
EAGAIN
或EWOULDBLOCK
。? 因此在以非阻塞方式讀取數(shù)據(jù)時(shí),如果調(diào)用read函數(shù)時(shí)得到的返回值是-1,此時(shí)還需要通過(guò)錯(cuò)誤碼進(jìn)一步進(jìn)行判斷,如果錯(cuò)誤碼的值是EAGAIN或EWOULDBLOCK,說(shuō)明本次調(diào)用read函數(shù)出錯(cuò)是因?yàn)榈讓訑?shù)據(jù)還沒(méi)有就緒,因此后續(xù)還應(yīng)該繼續(xù)調(diào)用read函數(shù)進(jìn)行輪詢檢測(cè)數(shù)據(jù)是否就緒,當(dāng)數(shù)據(jù)繼續(xù)時(shí)再進(jìn)行數(shù)據(jù)的讀取。
? 此外,調(diào)用read函數(shù)在讀取到數(shù)據(jù)之前可能會(huì)被其他信號(hào)中斷,此時(shí)read函數(shù)也會(huì)以出錯(cuò)的形式返回,此時(shí)的錯(cuò)誤碼會(huì)被設(shè)置為EINTR,此時(shí)應(yīng)該重新執(zhí)行read函數(shù)進(jìn)行數(shù)據(jù)的讀取。
??因此在以非阻塞的方式讀取數(shù)據(jù)時(shí),如果調(diào)用read函數(shù)讀取到的返回值為-1,此時(shí)并不應(yīng)該直接認(rèn)為read函數(shù)在底層讀取數(shù)據(jù)時(shí)出錯(cuò)了,而應(yīng)該繼續(xù)判斷錯(cuò)誤碼,如果錯(cuò)誤碼的值為
EAGAIN
、EWOULDBLOCK
或EINTR
則應(yīng)該繼續(xù)調(diào)用read函數(shù)再次進(jìn)行讀取或者說(shuō)明底層沒(méi)有數(shù)據(jù)。
? 創(chuàng)建一個(gè)監(jiān)聽(tīng)連接是什么意思呢?當(dāng)我們服務(wù)端創(chuàng)建套接字、綁定ip和端口后,需要將該套接字設(shè)置為監(jiān)聽(tīng)狀態(tài),以上過(guò)程就是在創(chuàng)建一個(gè)監(jiān)聽(tīng)的連接,也就是創(chuàng)建一個(gè)服務(wù)端連接。我們對(duì)上述的過(guò)程進(jìn)行了封裝,具體封裝后的代碼如下:‘
// 創(chuàng)建一個(gè)服務(wù)器端連接 bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) { if (Create() == false) return false; if (block_flag) NonBlock(); if (Bind(ip, port) == false) return false; if (Listen() == false) return false; ReuseAddress(); return true; }
? 創(chuàng)建客戶端連接是什么意思?無(wú)非就是創(chuàng)建一個(gè)套接字,然后向服務(wù)器發(fā)起請(qǐng)求連接。這也是客戶端所需要做的。我們對(duì)其進(jìn)行簡(jiǎn)單封裝后代碼如下:
// 創(chuàng)建一個(gè)客戶端連接 bool CreateClient(uint16_t port, const std::string &ip) { if (Create() == false) return false; if (Connect(ip, port) == false) return false; return true; }
4、3 Channel模塊
? 高性能服務(wù)器必備的就是多路轉(zhuǎn)接技術(shù)。當(dāng)然,我們的項(xiàng)目也不例外。我們需要利用多路轉(zhuǎn)接技術(shù)來(lái)幫我們進(jìn)行等待(監(jiān)控)事件就緒。且當(dāng)有事件就緒時(shí),會(huì)有一個(gè)Handler函數(shù)根據(jù)所觸發(fā)的就緒事件統(tǒng)一幫我們處理就緒后的操作。
? 每個(gè)通信套接字都會(huì)有許多不同的事件,例如:讀事件、寫(xiě)事件等等。為了方便我們后續(xù)對(duì)描述符(套接字)的監(jiān)控事件在用戶態(tài)更容易維護(hù),以及觸發(fā)事件后的操作流程更加的清晰,我們?cè)谶@里對(duì)描述符(套接字)監(jiān)控的事件和管理進(jìn)行封裝。那么Channel模塊的主要功能就很清晰了。
1.對(duì)監(jiān)控事件的管理:
- 判斷描述符是否可讀;
- 判斷描述符是否可寫(xiě);
- 對(duì)描述符監(jiān)控添加可讀;
- 對(duì)描述符監(jiān)控添加可寫(xiě);
- 解除可讀事件監(jiān)控;
- 解除可寫(xiě)事件監(jiān)控;
- 解除所有事件監(jiān)控。
2.對(duì)監(jiān)控事件觸發(fā)后的處理:
- 設(shè)置對(duì)于不同事件的回調(diào)處理函數(shù);
- 明確觸發(fā)了某個(gè)事件該如何處理。
? 我們先看一下Channel模塊的代碼:
class Channel { private: int _fd; uint32_t events; // 當(dāng)前需要監(jiān)控的事件 uint32_t revents; // 當(dāng)前連接觸發(fā)的事件 using eventCallback = std::function < void() > ; eventCallback _read_callback; // 可讀事件被觸發(fā)的回調(diào)函數(shù) eventCallback _error_callback; // 可寫(xiě)事件被觸發(fā)的回調(diào)函數(shù) eventCallback _close_callback; // 連接關(guān)閉事件被觸發(fā)的回調(diào)函數(shù) eventCallback _event_callback; // 任意事件被觸發(fā)的回調(diào)函數(shù) eventCallback _write_callback; // 可寫(xiě)事件被觸發(fā)的回調(diào)函數(shù) public: Channel(int fd) : fd(_fd) {} int Fd() { return _fd; } void SetRevents(uint32_t events) { _revents = events; } void setReadCallback(const eventCallback &cb) { _read_callback = cb; } void setWriteCallback(const eventCallback &cb) { _write_callback = cb; } void setErrorCallback(const eventCallback &cb) { _error_callback = cb; } void setCloseCallback(const eventCallback &cb) { _close_callback = cb; } void setEventCallback(const eventCallback &cb) { _event_callback = cb; } bool readAble() { // 當(dāng)前是否可讀 return (_events & EPOLLIN); } bool writeAble() { // 當(dāng)前是否可寫(xiě) return (_events & EPOLLOUT); } void enableRead() { // 啟動(dòng)讀事件監(jiān)控 _events |= EPOLLIN; // 后面會(huì)添加到EventLoop的事件監(jiān)控! } void enableWrite() { // 啟動(dòng)寫(xiě)事件監(jiān)控 _events |= EPOLLOUT; // 后面會(huì)添加到EventLoop的事件監(jiān)控! } void disableRead() { // 關(guān)閉讀事件監(jiān)控 _events &= ~EPOLLIN; // 后面會(huì)修改到EventLoop的事件監(jiān)控! } void disableWrite() { // 關(guān)閉寫(xiě)事件監(jiān)控 _events &= ~EPOLLOUT; } void disableAll() { // 關(guān)閉所有事件監(jiān)控 _events = 0; } void Remove(); // 后面會(huì)調(diào)用EventLoop接口移除監(jiān)控 void HandleEvent() { if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { if (_read_callback) _read_callback(); } /*有可能會(huì)釋放連接的操作事件,一次只處理一個(gè)*/ if (_revents & EPOLLOUT) { if (_write_callback) _write_callback(); } else if (_revents & EPOLLERR) { if (_error_callback) _error_callback(); // 一旦出錯(cuò),就會(huì)釋放連接,因此要放到前邊調(diào)用任意回調(diào) } else if (_revents & EPOLLHUP) { if (_close_callback) _close_callback(); } /*不管任何事件,都調(diào)用的回調(diào)函數(shù)*/ if (_event_callback) _event_callback(); } };
? 注意,我們?cè)谶@里使用多路轉(zhuǎn)接技術(shù)時(shí),采用的時(shí)epoll。因?yàn)閑poll的編碼簡(jiǎn)單,且效率最高。所以在私有成員變量時(shí),我們給出了監(jiān)控事件和就緒事件。
? 同時(shí),我們這里使用了通用的函數(shù)封裝器function。原因就是我們并不知道所觸發(fā)事件的回調(diào)函數(shù)所需要的參數(shù)。當(dāng)在設(shè)置回調(diào)函數(shù)時(shí),使用函數(shù)包裝器bind進(jìn)行綁定參數(shù)即可。
? 在Handler也就是上述的HandlerEvent函數(shù)中,我們對(duì)所觸發(fā)的事件需要回調(diào)進(jìn)行了分類(lèi)。讀事件觸發(fā)后,并不會(huì)直接釋放連接(后續(xù)會(huì)講解原因)。其他事件觸發(fā)后,都有可能導(dǎo)致連接被釋放,所以一次處理一個(gè)事件,以防連接被釋放的情況下再去處理事件就會(huì)導(dǎo)致陳鼓型崩潰。
4、4 Poller模塊
? 上述的Channel模塊是對(duì)描述符的監(jiān)控事件進(jìn)行管理的封裝?,F(xiàn)在我們還需要對(duì)描述符進(jìn)行IO事件監(jiān)控啊!說(shuō)明這兩個(gè)模塊是密切關(guān)聯(lián)的。
? 上述我們也提到了,所用的多路轉(zhuǎn)接模型是epoll。那么該模塊就是對(duì)epoll的操作進(jìn)行封裝的。封裝思想:
- 必須擁有一個(gè)epoll的操作句柄;
- 擁有一個(gè)struct epoll_event 結(jié)構(gòu)數(shù)組,監(jiān)控保存所有的活躍事件;
- 使用一個(gè)哈希表管理描述符與描述符對(duì)應(yīng)的事件管理Channnel對(duì)象。
? 整體邏輯流程:
- 對(duì)描述符進(jìn)行監(jiān)控,通過(guò)Channnel才能知道描述符監(jiān)控什么事件(注意,我們?cè)谑褂胑poll對(duì)事件監(jiān)控前,一定是在Channel模塊中對(duì)所需要監(jiān)控的事件events進(jìn)行了設(shè)置,然后再使用epoll進(jìn)行監(jiān)控);
- 當(dāng)描述符就緒了,通過(guò)描述符在哈希表中找到對(duì)應(yīng)的Channel(當(dāng)然,我們都會(huì)添加Channel到哈希表種的。得到了Channel才知道什么事件如何處理)當(dāng)描述符就緒了,返回就緒描述符對(duì)應(yīng)的Channel。
? 通過(guò)對(duì)上述的了解,我們就已經(jīng)知道該模塊所需要實(shí)現(xiàn)的功能了。具體如下:
- 添加事件監(jiān)控 (channel模塊);
- 修改事件監(jiān)控;
- 移除事件監(jiān)控;
- 開(kāi)始事件監(jiān)控。
? 具體該模塊實(shí)現(xiàn)代碼如下:
#define MAX_EPOLLEVENTS 1024 // Poller模塊是對(duì)epoll進(jìn)?封裝的?個(gè)模塊,主要實(shí)現(xiàn)epoll的IO事件添加,修改,移除,獲取活躍連接功能。 class Poller { private: int _epfd; struct epoll_event _evs[MAX_EPOLLEVENTS]; std::unordered_map<int, Channel *> _channels; private: // 對(duì)epoll直接操作 void Update(Channel *channel, int op) { int fd = channel->Fd(); struct epoll_event ev; ev.data.fd = fd; ev.events = channel->Events(); int ret = epoll_ctl(_epfd, op, fd, &ev); if (ret < 0) { ERR_LOG("EPOLLCTL FAILED!!!"); abort(); // 推出程序??! } } // 判斷一個(gè)Channel是否已經(jīng)添加到了事件監(jiān)控 bool hashChannel(Channel *channel) { auto it = _channels.find(channel->Fd()); if (it == _channels.end()) { return false; } return true; } public: Poller() { _epfd = epoll_create(MAX_EPOLLEVENTS); if (_epfd < 0) { ERR_LOG("EPOLL CREATE FAILED!!"); abort(); // 退出程序 } } // 添加或者修改監(jiān)控事件 void UpdateEvent(Channel *channel) { // 有描述符也有事件 bool ret = hashChannel(channel); if (ret == false) { _channels.insert(std::make_pair(channel->Fd(), channel)); return Update(channel, EPOLL_CTL_ADD); // 不存在添加 } return Update(channel, EPOLL_CTL_MOD); // 存在了更新 } // 移除監(jiān)控事件 void removeEvent(Channel *channel) { auto it = _channels.find(channel->Fd()); if (it != _channels.end()) { _channels.erase(it); } Update(channel, EPOLL_CTL_DEL); } // 開(kāi)始監(jiān)控,返回活躍鏈接! void Poll(std::vector<Channel *> *active) { // int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout) int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); if (nfds < 0) { if (errno == EINTR) { return; } ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno)); abort(); // 退出程序 } for (int i = 0; i < nfds; i++) { auto it = _channels.find(_evs[i].data.fd); assert(it != _channels.end()); it->second->setRevents(_evs[i].events); // 設(shè)置實(shí)際就緒的事件 active->push_back(it->second); } return; } };
? 這個(gè)模塊主要都是封裝的對(duì)epoll的操作。其中需要注意的是,我們?cè)趯?duì)事件開(kāi)始監(jiān)控時(shí),需要將不同描述符就緒的事件進(jìn)行返回,以便我們后續(xù)進(jìn)行操作。所以這里就傳入了一個(gè)指針,作為輸出型參數(shù)。
4、5 Eventloop模塊
??這個(gè)模塊其實(shí)就是我們所說(shuō)的 one thread one loop中的loop,也是我們所說(shuō)的reactor。這個(gè)模塊必然是一個(gè)模塊對(duì)應(yīng)一個(gè)線程。這個(gè)模塊是干什么的呢?其實(shí)就是進(jìn)行事件監(jiān)控管理和事件處理的模塊。你也可以理解為對(duì)Channel模塊和Poller模塊的整合。接下來(lái)我們?cè)敿?xì)解釋一下該模塊的思路講解。
? EventLoop模塊是進(jìn)行時(shí)間監(jiān)控,以及事件處理的模塊。同時(shí)這個(gè)模塊還是與線程一一對(duì)應(yīng)的。監(jiān)控了一個(gè)鏈接,而這個(gè)連接一旦就緒,就要進(jìn)行事件處理。假如一個(gè)線程正在執(zhí)行就緒事件,那么該連接再有其他事件就緒呢?會(huì)不會(huì)就被分配到其他線程了呢?但是如果這個(gè)描述符在多個(gè)線程中都出發(fā)了事件進(jìn)行處理,就會(huì)存在線程安全的問(wèn)題。因此我們需要將一個(gè)連接的事件監(jiān)控,以及連接事件的處理,以及其他操作都放在同一個(gè)線程當(dāng)中進(jìn)行。
? 但是問(wèn)題又來(lái)了:如何保證一個(gè)連接所有的操作都在eventloop對(duì)應(yīng)的線程中執(zhí)行呢?我們可以在eventloop模塊中添加一個(gè)任務(wù)隊(duì)列,對(duì)連接的所有操作,都進(jìn)行一次封裝,將對(duì)連接的操作并不直接執(zhí)行,而是當(dāng)作任務(wù)添加到任務(wù)隊(duì)列當(dāng)中去。
? 總結(jié)eventloop處理流程:
- 在線程中對(duì)描述符進(jìn)行事件監(jiān)控;
- 有描述符就緒,則對(duì)描述符進(jìn)行事件處理(必須保證處理回調(diào)函數(shù)中的操作都在線程當(dāng)中);
- 所有的就緒事件處理完了,這時(shí)候再去將任務(wù)隊(duì)列中的任務(wù)一一執(zhí)行。
? 事件監(jiān)控就交給Poller模塊來(lái)處理,有事件就緒了則進(jìn)行處理事件。但是有一個(gè)需要注意的點(diǎn):因?yàn)橛锌赡芤驗(yàn)榈却枋龇鸌O事件就緒,導(dǎo)致執(zhí)行流流程阻塞,這時(shí)候任務(wù)隊(duì)列中的任務(wù)將得不到執(zhí)行,因此得有一個(gè)事件通知的東西,能夠喚醒事件監(jiān)控的阻塞。
? 我們?cè)賮?lái)看一下eventfd函數(shù)。如下圖:
??eventfd:一種事件通知機(jī)制,該函數(shù)就是創(chuàng)建一個(gè)描述符用于實(shí)現(xiàn)事件通知,eventfd本質(zhì)在內(nèi)核里邊管理的就是一個(gè)計(jì)數(shù)器。創(chuàng)建eventfd就會(huì)在內(nèi)核中創(chuàng)建一個(gè)計(jì)數(shù)器(結(jié)構(gòu)),每當(dāng)向evenfd中寫(xiě)入一個(gè)數(shù)值--用于表示事件通知次數(shù),可以使用read進(jìn)行數(shù)據(jù)的讀取,讀取到的數(shù)據(jù)就是通知的次數(shù)。假設(shè)每次給eventfd中寫(xiě)入一個(gè)1,就表示通知了一次,連續(xù)寫(xiě)了三次之后,再去read讀取出來(lái)的數(shù)字就是3,讀取之后計(jì)數(shù)清0。用處:在EventLoop模塊中實(shí)現(xiàn)線程間的事件通知功能。eventfd也是通過(guò)read/write/close進(jìn)行操作的。
? 接下來(lái)我們看一下該模塊的代碼實(shí)現(xiàn):
class EventLoop { private: using Functor = std::function<void()>; std::thread::id _thread_id; // 線程ID int _event_fd; // eventfd 喚醒IO事件監(jiān)控有可能的阻塞!??! std::unique_ptr<Channel> _event_channel; Poller _poller; // 進(jìn)行所有描述符的事件監(jiān)控 std::vector<Functor> _tasks; // 任務(wù)池 std::mutex _mutex; // 實(shí)現(xiàn)任務(wù)池操作的線程安全!??! public: // 執(zhí)行任務(wù)池中的所有任務(wù)!! void runAllTask() { std::vector<Functor> functor; { std::unique_lock<std::mutex> _lock(_mutex); // 出了作用域,鎖就會(huì)被解開(kāi)??! _tasks.swap(functor); } for (auto &f : functor) { f(); } return; } static int createEventFd() { int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (efd < 0) { ERR_LOG("CREATE ENVENTED FAILED !!!"); abort(); } return efd; } void readEventfd() { uint64_t res = 0; int ret = read(_event_fd, &res, sizeof(res)); if (ret < 0) { if (errno == EINTR || errno == EAGAIN) { return; } ERR_LOG("READ EVENTFD FAILED!"); abort(); } return; } void weakEventFd() { uint64_t val = 1; int ret = write(_event_fd, &val, sizeof(val)); if (ret < 0) { if (errno == EINTR) { return; } ERR_LOG("READ EVENTFD FAILED!"); abort(); } return; } public: EventLoop() : _thread_id(std::this_thread::get_id()), _event_fd(createEventFd()), _event_channel(new Channel(this, _event_fd)), { // 給eventfd添加可讀事件回調(diào)函數(shù),讀取eventfd事件通知次數(shù) _event_channel->setReadCallback(std::bind(&EventLoop::readEventfd, this)); // 啟動(dòng)eventfd的讀事件監(jiān)控 _event_channel->enableRead(); } void runInLoop(const Functor &cb) { // 判斷將要執(zhí)行的任務(wù)是否處于當(dāng)前線程中,如果是則執(zhí)行,不是則壓入隊(duì)列。 if (isInLoop()) { return cb(); } return QueueInLoop(cb); } void queueInLoop(const Functor &cb) { // 將操作壓入任務(wù)池! std::unique_lock<std::mutex> _lock(_mutex); // 喚醒有可能因?yàn)闆](méi)有事件就緒,而導(dǎo)致的epoll阻塞; // 其實(shí)就是給eventfd寫(xiě)入一個(gè)數(shù)據(jù),eventfd就會(huì)觸發(fā)可讀事件 _tasks.push_back(cb); weakEventFd(); } bool isInLoop() { // 永遠(yuǎn)判斷當(dāng)前線程是否是EventLoop所對(duì)應(yīng)的線程 return (_thread_id == std::this_thread::get_id()); } void updateEvent(Channel *channel) { // 添加/修改描述符的事件監(jiān)控 return _poller.UpdateEvent(channel); } void removeEvent(Channel *channel) { // 移除描述符的監(jiān)控 return _poller.removeEvent(channel); } void Start() { // 任務(wù)監(jiān)控完畢進(jìn)行處理任務(wù)! // 三步走:事件監(jiān)控-》就緒事件處理-》執(zhí)行任務(wù) std::vector<Channel *> actives; _poller.Poll(&actives); for (auto &channel : actives) { channel->handleEvent(); } runAllTask(); } };
4、5、1 時(shí)間輪思想
??現(xiàn)在我們想設(shè)置一個(gè)超時(shí)連接釋放的功能。所謂超市連接釋放,其實(shí)就是一個(gè)在我們?cè)O(shè)置的一段時(shí)間內(nèi),如果該連接沒(méi)有任何IO事件就緒,我們就認(rèn)為他是一個(gè)不活躍連接,直接釋放即可!假設(shè)我們只使用定時(shí)器,存在一個(gè)很大的問(wèn)題,每次超時(shí)都要將所有的連接遍歷一遍(因?yàn)槊總€(gè)連接的超時(shí)間可能是不同的),如果有上萬(wàn)個(gè)連接,效率無(wú)疑是較為低下的。這時(shí)候大家就會(huì)想到,我們可以針對(duì)所有的連接,根據(jù)每個(gè)連接最近一次通信的系統(tǒng)時(shí)間建立一個(gè)小根堆,這樣只需要每次針對(duì)堆頂部分的連接逐個(gè)釋放,直到?jīng)]有超時(shí)的連接為止,這樣也可以大大提高處理的效率。
??上述方法可以實(shí)現(xiàn)定時(shí)任務(wù),但是這里給大家介紹另一種方案:時(shí)間輪。時(shí)間輪的思想來(lái)源于鐘表,如果我們定了一個(gè)3點(diǎn)鐘的鬧鈴,則當(dāng)時(shí)針走到3的時(shí)候,就代表時(shí)間到了。同樣的道理,如果我們定義了一個(gè)數(shù)組,并且有一個(gè)指針,指向數(shù)組起始位置,這個(gè)指針每秒鐘向后走動(dòng)一步,走到哪里,則代表哪里的任務(wù)該被執(zhí)行了,那么如果我們想要定一個(gè)3s后的任務(wù),則只需要將任務(wù)添加到tick+3位置,則每秒中走一步,三秒鐘后tick走到對(duì)應(yīng)位置,這時(shí)候執(zhí)行對(duì)應(yīng)位置的任務(wù)即可。但是,同一時(shí)間可能會(huì)有大批量的定時(shí)任務(wù),因此我們可以給數(shù)組對(duì)應(yīng)位置下拉一個(gè)數(shù)組,這樣就可以在同一個(gè)時(shí)刻上添加多個(gè)定時(shí)任務(wù)了。
??當(dāng)然,上述操作也有一些缺陷,比如我們?nèi)绻x一個(gè)60s后的任務(wù),則需要將數(shù)組的元素個(gè)數(shù)設(shè)置為60才可以,如果設(shè)置一小時(shí)后的定時(shí)任務(wù),則需要定義3600個(gè)元素的數(shù)組,這樣無(wú)疑是比較麻煩的。
??因此,可以采用多層級(jí)的時(shí)間輪,有秒針輪,分針輪,時(shí)針輪,60<time<3600則time/60就是分針輪對(duì)應(yīng)存儲(chǔ)的位置,當(dāng)tick/3600等于對(duì)應(yīng)位置的時(shí)候,將其位置的任務(wù)向分針,秒針輪進(jìn)行移動(dòng)。
??因?yàn)楫?dāng)前我們的應(yīng)用中,倒是不用設(shè)計(jì)的這么麻煩,因?yàn)槲覀兊亩〞r(shí)任務(wù)通常設(shè)置的30s以內(nèi),所以簡(jiǎn)單的單層時(shí)間輪就夠用了。
??但是,我們也得考慮一個(gè)問(wèn)題,當(dāng)前的設(shè)計(jì)是時(shí)間到了,則主動(dòng)去執(zhí)行定時(shí)任務(wù),釋放連接,那能不能在時(shí)間到了后,自動(dòng)執(zhí)行定時(shí)任務(wù)呢,這時(shí)候我們就想到一個(gè)操作-類(lèi)的析構(gòu)函數(shù)。
??一個(gè)類(lèi)的析構(gòu)函數(shù),在對(duì)象被釋放時(shí)會(huì)自動(dòng)被執(zhí)行,那么我們?nèi)绻麑⒁粋€(gè)定時(shí)任務(wù)作為一個(gè)類(lèi)的析構(gòu)函數(shù)內(nèi)的操作,則這個(gè)定時(shí)任務(wù)在對(duì)象被釋放的時(shí)候就會(huì)執(zhí)行。
??但是僅僅為了這個(gè)目的,而設(shè)計(jì)一個(gè)額外的任務(wù)類(lèi),好像有些不劃算,但是這里我們又要考慮另一個(gè)問(wèn)題,那就是假如有一個(gè)連接建立成功了,我們給這個(gè)連接設(shè)置了一個(gè)30s后的定時(shí)銷(xiāo)毀任務(wù),但是在第10s的時(shí)候,這個(gè)連接進(jìn)行了一次通信,那么我們應(yīng)該時(shí)在第30s的時(shí)候關(guān)閉,還是第40s的時(shí)候關(guān)閉呢?無(wú)疑應(yīng)該是第40s的時(shí)候。也就是說(shuō),這時(shí)候,我們需要讓這個(gè)第30s的任務(wù)失效,但是我們?cè)撊绾螌?shí)現(xiàn)這個(gè)操作呢?
??這里,我們就用到了智能指針shared_ptr,shared_ptr有個(gè)計(jì)數(shù)器,當(dāng)計(jì)數(shù)為0的時(shí)候,才會(huì)真正釋放一個(gè)對(duì)象,那么如果連接在第10s進(jìn)行了一次通信,則我們繼續(xù)向定時(shí)任務(wù)中,添加一個(gè)30s后(也就是第40s)的任務(wù)類(lèi)對(duì)象的shared_ptr,則這時(shí)候兩個(gè)任務(wù)shared_ptr計(jì)數(shù)為2,則第30s的定時(shí)任務(wù)被釋放的時(shí)候,計(jì)數(shù)-1,變?yōu)?,并不為0,則并不會(huì)執(zhí)行實(shí)際的析構(gòu)函數(shù),那么就相當(dāng)于這個(gè)第30s的任務(wù)失效了,只有在第40s的時(shí)候,這個(gè)任務(wù)才會(huì)被真正釋放。下面我們來(lái)看一下其具體的實(shí)現(xiàn)如下:
using TaskFunc = std::function<void()>; using ReleaseFunc = std::function<void()>; class TimeTask { public: TimeTask(uint64_t id, uint32_t timeout, const TaskFunc& cb) :_id(id) ,_timeout(timeout) ,_task_cb(cb) ,_canceled(false) {} ~TimeTask() { if(_canceled == false) _task_cb(); _release(); } void Cancel() { _canceled = true; } void SetRelease(const ReleaseFunc& cb) { _release = cb; } uint32_t Delaytime() { return _timeout; } private: uint64_t _id; uint32_t _timeout; bool _canceled; TaskFunc _task_cb; ReleaseFunc _release; }; class TimeWheel { public: TimeWheel() :_tick(0) ,_capacity(60) ,_wheel(_capacity) {} void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc& cb) { PtrTask pt(new TimeTask(id, delay, cb)); pt->SetRelease(std::bind(&TimeWheel::RemoveTimer, this, id)); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); _timers[id] = WeakTask(pt); // std::cout <<"添加任務(wù)成功, 任務(wù)id:" << id << std::endl; } void TimeRefresh(uint64_t id) { auto it = _timers.find(id); if(it == _timers.end()) return; PtrTask pt = it->second.lock(); int delay = pt->Delaytime(); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); // std::cout <<"刷新定時(shí)任務(wù)成功, 任務(wù)id:" << id << std::endl; } void TimeCancel(uint64_t id) { auto it = _timers.find(id); if(it == _timers.end()) return; PtrTask pt = it->second.lock(); if(pt) pt->Cancel(); } void RunTimerTask() { _tick = (_tick + 1) % _capacity; _wheel[_tick].clear(); } private: void RemoveTimer(uint64_t id) { auto it = _timers.find(id); if(it == _timers.end()) return; _timers.erase(id); } private: using WeakTask = std::weak_ptr<TimeTask>; using PtrTask = std::shared_ptr<TimeTask>; int _tick; int _capacity; std::vector<std::vector<PtrTask>> _wheel; std::unordered_map<uint64_t, WeakTask> _timers; };
? 下面是測(cè)試代碼,大家可自行測(cè)試?yán)斫猓?/p>
#include <iostream> #include "timewheel.hpp" class Test { public: Test() { std::cout << "構(gòu)造" << std::endl; } ~Test() { std::cout << "析構(gòu)" << std::endl; } }; void DelTest(Test *t) { delete t; } int main() { TimeWheel tw; Test* t = new Test(); tw.TimerAdd(888, 3, std::bind(DelTest, t)); for(int i = 0; i < 5; i++) { sleep(1); tw.TimeRefresh(888); tw.RunTimerTask(); std::cout << "Test 定時(shí)任務(wù)被重新定時(shí)執(zhí)行" << std::endl; } while(true) { sleep(1); std::cout <<"tick 移動(dòng)了一部" << std::endl; tw.RunTimerTask(); } return 0; }
4、5、2?TimerWheel定時(shí)器模塊整合
? 現(xiàn)在我們想設(shè)置一個(gè)超時(shí)連接釋放的功能,就需要借助我們上述的定時(shí)功能了。首先我們需要將一個(gè)連接任務(wù)保存起來(lái),然后我們采用時(shí)間輪的思想,就是每一秒向后走一個(gè)位置,如果該位置有任務(wù),那么說(shuō)明該位置的任務(wù)超時(shí)了,需要釋放。因?yàn)槎〞r(shí)器任務(wù)需要被監(jiān)控起來(lái),每當(dāng)超過(guò)我們所定時(shí)的事件,就會(huì)自動(dòng)往fd中寫(xiě)入一個(gè)數(shù)據(jù),所以我們可以通過(guò)EventLoop將其進(jìn)行監(jiān)控管理。
? 當(dāng)我們有一個(gè)連接創(chuàng)建后,就為該連接添加一個(gè)秒級(jí)別的定時(shí)任務(wù)。同時(shí)這只一個(gè)一秒觸發(fā)一次的定時(shí)器。當(dāng)我們添加一個(gè)定時(shí)任務(wù)后,同時(shí)為該定時(shí)任務(wù)創(chuàng)建一個(gè)定時(shí)器,把該定時(shí)器的timerfd添加可讀監(jiān)控,每當(dāng)觸發(fā)可讀事件就緒時(shí),我們?cè)O(shè)置了回調(diào),就會(huì)調(diào)用回調(diào)函數(shù)去讀取的超時(shí)次數(shù),然后秒針再向后走對(duì)應(yīng)步數(shù)即可。
class TimerTask { private: uint64_t _id; // 定時(shí)器任務(wù)對(duì)象ID uint32_t _timeout; // 定時(shí)任務(wù)的超時(shí)時(shí)間 bool _canceled; // false-表示沒(méi)有被取消, true-表示被取消 TaskFunc _task_cb; // 定時(shí)器對(duì)象要執(zhí)行的定時(shí)任務(wù) ReleaseFunc _release; // 用于刪除TimerWheel中保存的定時(shí)器對(duì)象信息 public: TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb) : _id(id) , _timeout(delay) , _task_cb(cb) , _canceled(false) {} ~TimerTask() { if (_canceled == false) _task_cb(); _release(); } void Cancel() { _canceled = true; } void SetRelease(const ReleaseFunc &cb) { _release = cb; } uint32_t DelayTime() { return _timeout; } }; class TimerWheel { private: using WeakTask = std::weak_ptr<TimerTask>; using PtrTask = std::shared_ptr<TimerTask>; int _tick; // 當(dāng)前的秒針,走到哪里釋放哪里,釋放哪里,就相當(dāng)于執(zhí)行哪里的任務(wù) int _capacity; // 表盤(pán)最大數(shù)量---其實(shí)就是最大延遲時(shí)間 std::vector<std::vector<PtrTask>> _wheel; std::unordered_map<uint64_t, WeakTask> _timers; EventLoop *_loop; int _timerfd; // 定時(shí)器描述符--可讀事件回調(diào)就是讀取計(jì)數(shù)器,執(zhí)行定時(shí)任務(wù) std::unique_ptr<Channel> _timer_channel; private: void RemoveTimer(uint64_t id) { auto it = _timers.find(id); if (it != _timers.end()) { _timers.erase(it); } } static int CreateTimerfd() { int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); if (timerfd < 0) { ERR_LOG("TIMERFD CREATE FAILED!"); abort(); } // int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old); struct itimerspec itime; itime.it_value.tv_sec = 1; itime.it_value.tv_nsec = 0; // 第一次超時(shí)時(shí)間為1s后 itime.it_interval.tv_sec = 1; itime.it_interval.tv_nsec = 0; // 第一次超時(shí)后,每次超時(shí)的間隔時(shí) timerfd_settime(timerfd, 0, &itime, NULL); return timerfd; } int ReadTimefd() { uint64_t times; // 有可能因?yàn)槠渌枋龇氖录幚砘ㄙM(fèi)事件比較長(zhǎng),然后在處理定時(shí)器描述符事件的時(shí)候,有可能就已經(jīng)超時(shí)了很多次 // read讀取到的數(shù)據(jù)times就是從上一次read之后超時(shí)的次數(shù) int ret = read(_timerfd, ×, 8); if (ret < 0) { ERR_LOG("READ TIMEFD FAILED!"); abort(); } return times; } // 這個(gè)函數(shù)應(yīng)該每秒鐘被執(zhí)行一次,相當(dāng)于秒針向后走了一步 void RunTimerTask() { _tick = (_tick + 1) % _capacity; _wheel[_tick].clear(); // 清空指定位置的數(shù)組,就會(huì)把數(shù)組中保存的所有管理定時(shí)器對(duì)象的shared_ptr釋放掉 } void OnTime() { // 根據(jù)實(shí)際超時(shí)的次數(shù),執(zhí)行對(duì)應(yīng)的超時(shí)任務(wù) int times = ReadTimefd(); for (int i = 0; i < times; i++) { RunTimerTask(); } } void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) { PtrTask pt(new TimerTask(id, delay, cb)); pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id)); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); _timers[id] = WeakTask(pt); } void TimerRefreshInLoop(uint64_t id) { // 通過(guò)保存的定時(shí)器對(duì)象的weak_ptr構(gòu)造一個(gè)shared_ptr出來(lái),添加到輪子中 auto it = _timers.find(id); if (it == _timers.end()) { return; // 沒(méi)找著定時(shí)任務(wù),沒(méi)法刷新,沒(méi)法延遲 } PtrTask pt = it->second.lock(); // lock獲取weak_ptr管理的對(duì)象對(duì)應(yīng)的shared_ptr int delay = pt->DelayTime(); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); } void TimerCancelInLoop(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return; // 沒(méi)找著定時(shí)任務(wù),沒(méi)法刷新,沒(méi)法延遲 } PtrTask pt = it->second.lock(); if (pt) pt->Cancel(); } public: TimerWheel(EventLoop *loop) : _capacity(60) , _tick(0) , _wheel(_capacity) , _loop(loop) , _timerfd(CreateTimerfd()) , _timer_channel(new Channel(_loop, _timerfd)) { _timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this)); _timer_channel->EnableRead(); // 啟動(dòng)讀事件監(jiān)控 } /*定時(shí)器中有個(gè)_timers成員,定時(shí)器信息的操作有可能在多線程中進(jìn)行,因此需要考慮線程安全問(wèn)題*/ /*如果不想加鎖,那就把對(duì)定期的所有操作,都放到一個(gè)線程中進(jìn)行*/ void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb); // 刷新/延遲定時(shí)任務(wù) void TimerRefresh(uint64_t id); void TimerCancel(uint64_t id); /*這個(gè)接口存在線程安全問(wèn)題--這個(gè)接口實(shí)際上不能被外界使用者調(diào)用,只能在模塊內(nèi),在對(duì)應(yīng)的EventLoop線程內(nèi)執(zhí)行*/ bool HasTimer(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return false; } return true; } }; void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { _loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb)); } // 刷新/延遲定時(shí)任務(wù) void TimerWheel::TimerRefresh(uint64_t id) { _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id)); } void TimerWheel::TimerCancel(uint64_t id) { _loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id)); }
4、5、3 Channel 與 EventLoop整合
? 注意,當(dāng)我們對(duì)一個(gè)文件描述符設(shè)置可讀事件監(jiān)控或者可寫(xiě)事件監(jiān)控時(shí),不僅僅是要設(shè)置Channel對(duì)應(yīng)的events中,因?yàn)榇藭r(shí)epoll底層實(shí)際上并沒(méi)有進(jìn)行監(jiān)控,我們還要設(shè)置到epoll模型當(dāng)中去!EventLoop中封裝了Poller,所以在這里我們直接包含EventLoop的一個(gè)指針即可。
class Poller; class EventLoop; class Channel { private: int _fd; EventLoop *_loop; uint32_t _events; // 當(dāng)前需要監(jiān)控的事件 uint32_t _revents; // 當(dāng)前連接觸發(fā)的事件 using EventCallback = std::function<void()>; EventCallback _read_callback; // 可讀事件被觸發(fā)的回調(diào)函數(shù) EventCallback _write_callback; // 可寫(xiě)事件被觸發(fā)的回調(diào)函數(shù) EventCallback _error_callback; // 錯(cuò)誤事件被觸發(fā)的回調(diào)函數(shù) EventCallback _close_callback; // 連接斷開(kāi)事件被觸發(fā)的回調(diào)函數(shù) EventCallback _event_callback; // 任意事件被觸發(fā)的回調(diào)函數(shù) public: Channel(EventLoop *loop, int fd) : _fd(fd), _events(0), _revents(0), _loop(loop) { } int Fd() { return _fd; } uint32_t Events() { return _events; } // 獲取想要監(jiān)控的事件 void SetRevents(uint32_t events) { _revents = events; } // 設(shè)置實(shí)際就緒的事件 void SetReadCallback(const EventCallback &cb) { _read_callback = cb; } void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; } void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; } void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; } void SetEventCallback(const EventCallback &cb) { _event_callback = cb; } // 當(dāng)前是否監(jiān)控了可讀 bool ReadAble() { return (_events & EPOLLIN); } // 當(dāng)前是否監(jiān)控了可寫(xiě) bool WriteAble() { return (_events & EPOLLOUT); } // 啟動(dòng)讀事件監(jiān)控 void EnableRead() { _events |= EPOLLIN; Update(); } // 啟動(dòng)寫(xiě)事件監(jiān)控 void EnableWrite() { _events |= EPOLLOUT; Update(); } // 關(guān)閉讀事件監(jiān)控 void DisableRead() { _events &= ~EPOLLIN; Update(); } // 關(guān)閉寫(xiě)事件監(jiān)控 void DisableWrite() { _events &= ~EPOLLOUT; Update(); } // 關(guān)閉所有事件監(jiān)控 void DisableAll() { _events = 0; Update(); } // 移除監(jiān)控 void Remove(); void Update(); // 事件處理,一旦連接觸發(fā)了事件,就調(diào)用這個(gè)函數(shù),自己觸發(fā)了什么事件如何處理自己決定 void HandleEvent() { if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { if (_event_callback) _event_callback(); /*不管任何事件,都調(diào)用的回調(diào)函數(shù)*/ if (_read_callback) _read_callback(); } /*有可能會(huì)釋放連接的操作事件,一次只處理一個(gè)*/ if (_revents & EPOLLOUT) { if (_event_callback) _event_callback(); if (_write_callback) _write_callback(); } else if (_revents & EPOLLERR) { if (_error_callback) _error_callback(); // 一旦出錯(cuò),就會(huì)釋放連接,因此要放到前邊調(diào)用任意回調(diào) } else if (_revents & EPOLLHUP) { if (_close_callback) _close_callback(); } // if (_event_callback) _event_callback(); } }; void Channel::Update() { _loop->UpdateEvent(this); } void Channel::Remove() { _loop->RemoveEvent(this); }
4、5、3 時(shí)間輪與EventLoop整合
?EventLoop模塊可以理解就是我們上邊所說(shuō)的Reactor模塊,它是對(duì)Poller模塊,TimeWheel與定時(shí)器模塊,Socket模塊的一個(gè)整體封裝,進(jìn)行所有描述符的事件監(jiān)控。EventLoop模塊為了保證整個(gè)服務(wù)器的線程安全問(wèn)題,因此要求使用者對(duì)于Connection的所有操作一定要在其對(duì)應(yīng)的EventLoop線程內(nèi)完成,不能在其他線程中進(jìn)行(比如組作使用者使用Connection發(fā)送數(shù)據(jù),以及關(guān)閉連接這種操作)。EventLoop模塊保證自己內(nèi)部所監(jiān)控的所有描述符,都要是活躍連接,非活躍連接就要及時(shí)釋放避免資源浪費(fèi)。綜上我們整合到所有的EventLoop如下:
class EventLoop { using Functor = std::function<void()>; public: void RunAllTask() { std::vector<Functor> functor; { std::unique_lock<std::mutex> _lock(_mutex); _tasks.swap(functor); } for (auto &f : functor) { f(); } return; } static int CreatEventFd() { int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (efd < 0) { ERR_LOG("create eventfd failed"); abort(); } return efd; } void ReadEventfd() { uint64_t res = 0; int ret = read(_event_fd, &res, sizeof(res)); if (ret < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { return; } ERR_LOG("read eventfd failed"); abort(); } return; } void WeakUpEventFd() { uint64_t val = 1; int ret = write(_event_fd, &val, sizeof(val)); if (ret < 0) { if (errno == EINTR) { return; } ERR_LOG("write eventfd failed!"); abort(); } return; } public: EventLoop() : _thread_id(std::this_thread::get_id()) , _event_fd(CreatEventFd()) , _event_channel(new Channel(this, _event_fd)) ,_timer_wheel(this) { _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this)); _event_channel->EnableRead(); } void Start() { while (1) { std::vector<Channel *> actives; _poller.Poll(&actives); for (auto &channel : actives) { channel->HandleEvent(); } RunAllTask(); } } bool IsInLoop() { return (_thread_id == std::this_thread::get_id()); } void AssertInLoop() { assert(_thread_id == std::this_thread::get_id()); } void RunInLoop(const Functor &cb) { if (IsInLoop()) { return cb(); } return QueueInLoop(cb); } void QueueInLoop(const Functor &cb) { { std::unique_lock<std::mutex> _lock(_mutex); _tasks.push_back(cb); } WeakUpEventFd(); } void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); } void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); } void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); } void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); } void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); } bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); } private: std::thread::id _thread_id; // 線程id int _event_fd; std::unique_ptr<Channel> _event_channel; Poller _poller; std::vector<Functor> _tasks; std::mutex _mutex; TimerWheel _timer_wheel; };
4、6 Connection模塊
??Connection模塊,一個(gè)連接有任何的事件怎么處理都是有這個(gè)模塊來(lái)進(jìn)行處理的,因?yàn)榻M件的設(shè)計(jì)也不知道使用者要如何處理事件,因此只能是提供一些事件回到函數(shù)由使用者設(shè)置。
??Connection模塊是對(duì)Buffer模塊,Socket模塊,Channel模塊的一個(gè)整體封裝,實(shí)現(xiàn)了對(duì)一個(gè)通信套接字的整體的管理,每一個(gè)進(jìn)行數(shù)據(jù)通信的套接字(也就是accept獲取到的新連接)都會(huì)使用Connection進(jìn)行管理。
- Connection模塊內(nèi)部包含有四個(gè)由組件使用者傳入的回調(diào)函數(shù):連接建立完成回調(diào),事件回調(diào),新數(shù)據(jù)回調(diào),關(guān)閉回調(diào)。
- Connection模塊內(nèi)部包含有兩個(gè)組件使用者提供的接口:數(shù)據(jù)發(fā)送接口,連接關(guān)閉接口;
- Connection模塊內(nèi)部包含有兩個(gè)用戶態(tài)緩沖區(qū):用戶態(tài)接收緩沖區(qū),用戶態(tài)發(fā)送緩沖區(qū);
- Connection模塊內(nèi)部包含有?個(gè)Socket對(duì)象:完成描述符面向系統(tǒng)的IO操作;
- Connection模塊內(nèi)部包含有?個(gè)Channel對(duì)象:完成描述符IO事件就緒的處理。
??Connection模塊大致具體處理流程如下:
- 實(shí)現(xiàn)向Channel提供可讀,可寫(xiě),錯(cuò)誤等不同事件的IO事件回調(diào)函數(shù),然后將Channel和對(duì)應(yīng)的描述符添加到Poller事件監(jiān)控中。
- 當(dāng)描述符在Poller模塊中就緒了I0可讀事件,則調(diào)用描述符對(duì)應(yīng)Channel中保存的讀事件處理函數(shù),進(jìn)行數(shù)據(jù)讀取,將socket接收緩沖區(qū)全部讀取到Connection管理的用戶態(tài)接收緩沖區(qū)中。然后調(diào)用由組件使用者傳入的新數(shù)據(jù)到來(lái)回調(diào)函數(shù)進(jìn)行處理。
- 組件使用者進(jìn)行數(shù)據(jù)的業(yè)務(wù)處理完畢后,通過(guò)Connection向使用者提供的數(shù)據(jù)發(fā)送接口,將數(shù)據(jù)寫(xiě)入Connection的發(fā)送緩沖區(qū)中。
- 啟動(dòng)描述符在Poll模塊中的IO寫(xiě)事件監(jiān)控,就緒后。調(diào)用Channel中保存的寫(xiě)事件處理函數(shù),將發(fā)送緩沖區(qū)中的數(shù)據(jù)通過(guò)Socket進(jìn)行面向系統(tǒng)的實(shí)際數(shù)據(jù)發(fā)送。
? 綜上我們?cè)賮?lái)設(shè)計(jì)Connection模塊的功能就很簡(jiǎn)單了。具體如下:
- 套接字的管理,能夠進(jìn)行套接字的操作!
- 連接事件的管理,可讀,可寫(xiě),錯(cuò)誤,掛斷,任意!
- 緩沖區(qū)管理:把socket讀取的數(shù)據(jù)放進(jìn)緩沖區(qū),要有輸入緩沖區(qū)和輸出緩沖區(qū)管理!
- 協(xié)議上下文的管理,記錄請(qǐng)求數(shù)據(jù)的處理過(guò)程!
- 啟動(dòng)或者取消非活躍連接超時(shí)銷(xiāo)毀功能!
- 回調(diào)函數(shù)的管理:因?yàn)檫B接收到數(shù)據(jù)之后該如何處理,需要由用戶決定,必須要有業(yè)務(wù)處理函數(shù)!一個(gè)連接建立成功后,應(yīng)該如何處理,由用戶決定,因此必須有連接建立成功的回調(diào)函數(shù)!一個(gè)連接關(guān)閉前,該如何處理,有用戶決定,因此必須有關(guān)閉連接回調(diào)函數(shù)!任何事件的產(chǎn)生,有沒(méi)有某些處理,由用戶決定,因此必須任意事件的回調(diào)函數(shù)!
? 其實(shí)Connection模塊都是當(dāng)服務(wù)器接收到新連接后,為新連接創(chuàng)建的。當(dāng)來(lái)一個(gè)新連接時(shí),我們?yōu)槠鋭?chuàng)建一個(gè)Connection,其中就包括了channel對(duì)新連接設(shè)置就緒事件觸發(fā)后的各種回調(diào)處理函數(shù),還有buffer維護(hù)的接受和發(fā)送緩沖區(qū)。當(dāng)設(shè)置完回調(diào)后,我們就可以把新連接設(shè)置成監(jiān)控讀事件狀態(tài)。如果客服端發(fā)送信息了,我們服務(wù)器就會(huì)把信息讀到接收緩沖區(qū)當(dāng)中。然后再調(diào)用所設(shè)置用戶的回調(diào)處理函數(shù)(業(yè)務(wù)處理)。最后再將數(shù)據(jù)寫(xiě)入發(fā)送緩沖區(qū)進(jìn)行發(fā)送。通過(guò)上面我們可以看到,Connection模塊就是對(duì)一個(gè)連接的所有操作進(jìn)行了封裝管理。
? 但是還有一個(gè)特殊場(chǎng)景:對(duì)連接進(jìn)行操作的時(shí)候,對(duì)于連接以及被釋放,導(dǎo)致內(nèi)存訪問(wèn)錯(cuò)誤,最終程序崩潰!
? 解決方案:使用智能指針share_ptr 對(duì)Connect 對(duì)象進(jìn)行管理,這樣可以保證任意一個(gè)地方對(duì)Connect對(duì)象進(jìn)行操作的時(shí)候,保存了一分share_ptr,因此就算其他地方進(jìn)行了釋放,也只是對(duì)share_ptr的計(jì)數(shù)器-1,而不會(huì)導(dǎo)致Connection的實(shí)際釋放!? 該模塊具體實(shí)現(xiàn)如下:
class Connection; typedef enum { DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING} ConnStatu; using PtrConnection = std::shared_ptr<Connection>; class Connection : public std::enable_shared_from_this<Connection> { using ConnectedCallback = std::function<void(const PtrConnection &)>; using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>; using ClosedCallback = std::function<void(const PtrConnection &)>; using AnyEventCallback = std::function<void(const PtrConnection &)>; private: void HandleRead() { char buf[65536]; ssize_t ret = _socket.NonBlockRecv(buf, 65535); if(ret < 0) { // 出錯(cuò)了并不會(huì)直接關(guān)閉連接,而是需要先處理一下緩沖區(qū)中的數(shù)據(jù) return ShutdownInLoop(); } _in_buffer.WriteAndPush(buf, ret); if(_in_buffer.ReadAbleSize() > 0) { return _message_callback(shared_from_this(), &_in_buffer); } } void HandleWrite() { ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPostion(), _out_buffer.ReadAbleSize()); if(ret < 0) { if(_in_buffer.ReadAbleSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); } return Release(); } // 發(fā)送完數(shù)據(jù)后,都指針向后偏移 _out_buffer.MoveReadOffset(ret); if(_out_buffer.ReadAbleSize() == 0) { _channel.DisableWrite(); if(_statu == DISCONNECTING) { return Release(); } } return; } void HandleClose() { if(_in_buffer.ReadAbleSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); } return Release(); } void HandleError() { return HandleClose(); } void HandleEvent() { if(_enable_inactive_release == true) { _loop->TimerRefresh(_conn_id); } if(_event_callback) { _event_callback(shared_from_this()); } } // 連接獲取之后,所處的狀態(tài)下要進(jìn)行各種設(shè)置(啟動(dòng)都監(jiān)控,調(diào)用回調(diào)函數(shù)) void EstablishedInLoop() { assert(_statu == CONNECTING); _statu = CONNECTED; _channel.EnableRead(); if(_connected_callback) { _connected_callback(shared_from_this()); } } // 真正釋放接口 void ReleaseInLoop() { // 1.修改狀態(tài) _statu = DISCONNECTED; // 2.移除事件監(jiān)控 _channel.Remove(); // 3.關(guān)閉描述符 _socket.Close(); // 4.如果當(dāng)前定時(shí)器隊(duì)列中還有定時(shí)銷(xiāo)毀任務(wù),則取消任務(wù) if(_loop->HasTimer(_conn_id)) { CancelInactiveReleaseInLoop(); } // 5.調(diào)用關(guān)閉回調(diào)函數(shù),避免先移除服務(wù)器管理的連接信息導(dǎo)致Connection被釋放,再去處理會(huì)出錯(cuò),因此先調(diào)用用戶的回調(diào)函數(shù) if(_closed_callback) { _closed_callback(shared_from_this()); } // 6.移除服務(wù)器內(nèi)部管理的連接信息 if(_server_closed_callback) { _server_closed_callback(shared_from_this()); } } // 這個(gè)接口并不是實(shí)際發(fā)送數(shù)據(jù)的,而是把數(shù)據(jù)放到發(fā)送緩沖區(qū),啟動(dòng)可寫(xiě)事件監(jiān)控 void SendInLoop(Buffer& buf) { if(_statu == DISCONNECTED) { return; } _out_buffer.WriteBufferAndPush(buf); if(_channel.WriteAble() == false) { _channel.EnableWrite(); } } //這個(gè)關(guān)閉操作并非實(shí)際的連接釋放操作,需要判斷還有沒(méi)有數(shù)據(jù)待處理,待發(fā)送 void ShutdownInLoop() { _statu = DISCONNECTING; if(_in_buffer.ReadAbleSize() > 0) { if(_message_callback) _message_callback(shared_from_this(), &_in_buffer); } //要么就是寫(xiě)入數(shù)據(jù)的時(shí)候出錯(cuò)關(guān)閉,要么就是沒(méi)有待發(fā)送數(shù)據(jù),直接關(guān)閉 if(_out_buffer.ReadAbleSize() > 0) { if(_channel.WriteAble() == false) _channel.EnableWrite(); } if(_out_buffer.ReadAbleSize() == 0) { Release(); } } //啟動(dòng)非活躍連接超時(shí)釋放規(guī)則 void EnableInactiveReleaseInLoop(int sec) { //1. 將判斷標(biāo)志 _enable_inactive_release 置為true _enable_inactive_release = true; //2. 如果當(dāng)前定時(shí)銷(xiāo)毀任務(wù)已經(jīng)存在,那就刷新延遲一下即可 if(_loop->HasTimer(_conn_id)) { return _loop->TimerRefresh(_conn_id); } //3. 如果不存在定時(shí)銷(xiāo)毀任務(wù),則新增 _loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this)); } void CancelInactiveReleaseInLoop() { _enable_inactive_release = false; if(_loop->HasTimer(_conn_id)) { _loop->TimerCancel(_conn_id); } } void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) { _context = context; _connected_callback = conn; _message_callback = msg; _closed_callback = closed; _event_callback = event; } public: Connection(EventLoop *loop, uint64_t conn_id, int sockfd) : _conn_id(conn_id) , _sockfd(sockfd) , _enable_inactive_release(false) , _loop(loop) , _statu(CONNECTING) , _socket(_sockfd) , _channel(loop, _sockfd) { _channel.SetCloseCallback(std::bind(&Connection::HandleClose, this)); _channel.SetEventCallback(std::bind(&Connection::HandleEvent, this)); _channel.SetReadCallback(std::bind(&Connection::HandleRead, this)); _channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this)); _channel.SetErrorCallback(std::bind(&Connection::HandleError, this)); } ~Connection() { DBG_LOG("release connction:%p", this); } int Fd() { return _sockfd; } int Id() { return _conn_id; } bool Connected() { return _statu == CONNECTED; } // 設(shè)置上下文 -- 建立連接完成后進(jìn)行調(diào)用 void SetContext(const Any &context) { _context = context; } Any* GetContext() { return &_context; } void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; } void SetSrvClosedCallback(const ClosedCallback &cb) { _server_closed_callback = cb; } // 連接建立完成后,進(jìn)行channel回調(diào)設(shè)置,啟動(dòng)都監(jiān)控,調(diào)用_connected_callback void Established() { _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this)); } //發(fā)送數(shù)據(jù),將數(shù)據(jù)放到發(fā)送緩沖區(qū),啟動(dòng)寫(xiě)事件監(jiān)控 void Send(const char* data, size_t len) { //外界傳入的data,可能是個(gè)臨時(shí)的空間,我們現(xiàn)在只是把發(fā)送操作壓入了任務(wù)池,有可能并沒(méi)有被立即執(zhí)行 //因此有可能執(zhí)行的時(shí)候,data指向的空間有可能已經(jīng)被釋放了。 Buffer buf; buf.WriteAndPush(data, len); _loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf))); } // 提供給組件使用者的關(guān)閉接口--并不實(shí)際關(guān)閉,需要判斷有沒(méi)有數(shù)據(jù)待處理 void Shutdown() { _loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this)); } void Release() { _loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this)); } // 啟動(dòng)非活躍銷(xiāo)毀,并定義多長(zhǎng)時(shí)間無(wú)通信就是非活躍,添加定時(shí)任務(wù) void EnableInactiveRelease(int sec) { _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec)); } // 取消非活躍銷(xiāo)毀 void CancelInactiveRelease() { _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this)); } // 切換協(xié)議---重置上下文以及階段性回調(diào)處理函數(shù) -- 而是這個(gè)接口必須在EventLoop線程中立即執(zhí)行 // 防備新的事件觸發(fā)后,處理的時(shí)候,切換任務(wù)還沒(méi)有被執(zhí)行--會(huì)導(dǎo)致數(shù)據(jù)使用原協(xié)議處理了。 void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) { _loop->AssertInLoop(); _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event)); } private: uint64_t _conn_id; // 連接的唯一ID,便于連接的管理和查找 // uint64_t _timer_id; //定時(shí)器ID,必須是唯一的,這塊為了簡(jiǎn)化操作使用conn_id作為定時(shí)器ID int _sockfd; // 連接關(guān)聯(lián)的文件描述符 bool _enable_inactive_release; // 連接是否啟動(dòng)非活躍銷(xiāo)毀的判斷標(biāo)志,默認(rèn)為false EventLoop *_loop; // 連接所關(guān)聯(lián)的一個(gè)EventLoop ConnStatu _statu; // 連接狀態(tài) Socket _socket; // 套接字操作管理 Channel _channel; // 連接的事件管理 Buffer _in_buffer; // 輸入緩沖區(qū)---存放從socket中讀取到的數(shù)據(jù) Buffer _out_buffer; // 輸出緩沖區(qū)---存放要發(fā)送給對(duì)端的數(shù)據(jù) Any _context; // 請(qǐng)求的接收處理上下文 /*這四個(gè)回調(diào)函數(shù),是讓服務(wù)器模塊來(lái)設(shè)置的(其實(shí)服務(wù)器模塊的處理回調(diào)也是組件使用者設(shè)置的)*/ /*換句話說(shuō),這幾個(gè)回調(diào)都是組件使用者使用的*/ ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback; /*組件內(nèi)的連接關(guān)閉回調(diào)--組件內(nèi)設(shè)置的,因?yàn)榉?wù)器組件內(nèi)會(huì)把所有的連接管理起來(lái),一旦某個(gè)連接要關(guān)閉*/ /*就應(yīng)該從管理的地方移除掉自己的信息*/ ClosedCallback _server_closed_callback; };
4、7 Acceptor模塊
? 上述的Connection模塊是對(duì)通信連接的所有操作管理,Acceptor模塊就是對(duì)連接套接字管理。Acceptor模塊是對(duì)Socket模塊,Channel模塊的一個(gè)整體封裝,實(shí)現(xiàn)了對(duì)一個(gè)監(jiān)聽(tīng)套接字的整體的管理。
- Acceptor模塊內(nèi)部包含有一個(gè)Socket對(duì)象:實(shí)現(xiàn)監(jiān)聽(tīng)套接字的操作;
- Acceptor模塊內(nèi)部包含有一個(gè)Channel對(duì)象:實(shí)現(xiàn)監(jiān)聽(tīng)套接字IO事件就緒的處理具體處理流程如下:
- 實(shí)現(xiàn)向Channel提供可讀事件的IO事件處理回調(diào)函數(shù),函數(shù)的功能其實(shí)也就是獲取新連接;
- 為新連接構(gòu)建一個(gè)Connection對(duì)象出來(lái)。
? 當(dāng)獲取了一個(gè)新建連接的描述符后,需要為這個(gè)通信連接,封裝一個(gè)connection對(duì)象,設(shè)置不同回調(diào)。注意:因?yàn)锳cceptor模塊本身并不知道一個(gè)鏈接產(chǎn)生了某個(gè)事件該如何處理,因此獲取一個(gè)通信連接后,Connection的封裝,以及事件回調(diào)的設(shè)置都應(yīng)該由服務(wù)器模塊來(lái)進(jìn)行!
? 具體實(shí)現(xiàn)代碼如下:
class Acceptor { using AcceptCallback = std::function<void(int)>; private: void HandleRead() { int newfd = _socket.Accept(); if(newfd < 0) return ; if(_accept_callback) _accept_callback(newfd); } int CreateServer(int port) { bool ret = _socket.CreateServer(port); assert(ret); return _socket.Fd(); } public: Acceptor(EventLoop* loop, int port) : _socket(CreateServer(port)) , _loop(loop) , _channel(_loop, _socket.Fd()) { /*不能將啟動(dòng)讀事件監(jiān)控,放到構(gòu)造函數(shù)中,必須在設(shè)置回調(diào)函數(shù)后,再去啟動(dòng)*/ /*否則有可能造成啟動(dòng)監(jiān)控后,立即有事件,處理的時(shí)候,回調(diào)函數(shù)還沒(méi)設(shè)置:新連接得不到處理,且資源泄漏*/ _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this)); } void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; } void Listen() { _channel.EnableRead(); } private: Socket _socket; // 創(chuàng)建監(jiān)聽(tīng)套接字 EventLoop *_loop; // 對(duì)監(jiān)聽(tīng)套接字進(jìn)行事件監(jiān)控 Channel _channel; // 對(duì)監(jiān)聽(tīng)套接字進(jìn)行事件管理 AcceptCallback _accept_callback; // 對(duì)新連接進(jìn)行管理 };
4、8 LoopThread模塊
? 上述我們講到EventLoop模塊是與線程一一對(duì)應(yīng)的,但是怎么保證一個(gè)線程和一個(gè)EvenLoop一一對(duì)應(yīng)起來(lái)呢?我們?cè)撃K就是將線程與EventLoop結(jié)合起來(lái)。EventLoop模塊實(shí)例化的對(duì)象,在構(gòu)造的時(shí)候就會(huì)初始化_thread_id,而后邊當(dāng)運(yùn)行一個(gè)操作的時(shí)候判斷當(dāng)前是否運(yùn)行在eventLoop模塊對(duì)應(yīng)的線程中,就是將線程ID與EventLoop模塊中的thread_id進(jìn)行一個(gè)比較,相同就表示在同一個(gè)線程,不同就表示當(dāng)前運(yùn)行線程并不是EventLoop線程。
? 具體就是EventLoop模塊在實(shí)例化對(duì)象的時(shí)候,必須在線程內(nèi)部。因?yàn)镋ventLoop實(shí)例化對(duì)象時(shí)會(huì)設(shè)置自己的thread_id,如果我們先創(chuàng)建了多個(gè)EventLoop對(duì)象,然后創(chuàng)建了多個(gè)線程,將各個(gè)線程的id,重新給EventLoop進(jìn)行設(shè)置存在問(wèn)題:在構(gòu)造EventLoop對(duì)象,到設(shè)置新的thread_id期間將是不可控的。因此我們必須先創(chuàng)建線程,然后在線程的入口函數(shù)中,去實(shí)例化EventLoop對(duì)象。
? 該模塊總結(jié)下來(lái)就是將eventloop模塊和線程整合起來(lái),對(duì)外提供的功能:
- 創(chuàng)建線程;
- 在線程中實(shí)例化 eventloop 對(duì)象;
- 可以向外部返回實(shí)例化的eventloop。
? 下面我們看一下該模塊的實(shí)現(xiàn)代碼:
class LoopThread { private: void ThreadEntry() { EventLoop loop; { std::unique_lock<std::mutex> lock(_mutex); _loop = &loop; _cond.notify_all(); } loop.Start(); } public: LoopThread() : _loop(nullptr) , _thread(std::thread(&LoopThread::ThreadEntry, this)) {} EventLoop *GetLoop() { EventLoop *loop = nullptr; { std::unique_lock<std::mutex> lock(_mutex); _cond.wait(lock, [&](){ return _loop != nullptr; }); loop = _loop; } return _loop; } private: std::mutex _mutex; std::condition_variable _cond; EventLoop* _loop; std::thread _thread; };
4、9 LoopThreadPool模塊
? LoopThreadPool模塊就是對(duì)所有的LoopThread進(jìn)行管理及分配。其功能:
- 線程數(shù)量可配置(0個(gè)或多個(gè))。注意事項(xiàng):在服務(wù)器中,主從Reactor模型是主線程只負(fù)責(zé)新連接獲取,從屬線程負(fù)責(zé)新連接的事件監(jiān)控及處理。因此當(dāng)前的線程池,有可能從屬線程會(huì)數(shù)量為0,也就是實(shí)現(xiàn)單Reactor服務(wù)器,一個(gè)線程及負(fù)責(zé)獲取連接,也負(fù)責(zé)連接的處理。
- 對(duì)所有的線程進(jìn)行管理,其實(shí)就是管理0個(gè)或多個(gè)LoopThread對(duì)象。
- 提供線程分配的功能。當(dāng)主線程獲取了一個(gè)新連接,需要將新連接掛到從屬線程上進(jìn)行事件監(jiān)控及處理。假設(shè)有0個(gè)從屬線程,則直接分配給主線程的EventLoop,進(jìn)行處理。假設(shè)有多個(gè)從屬線程,則采用RR輪轉(zhuǎn)思想,進(jìn)行線程的分配(將對(duì)應(yīng)線程的EventLoop獲取到,設(shè)置給對(duì)應(yīng)的Connection)。
? 下面我們直接看代碼一起理解一下。
class LoopThreadPool { public: LoopThreadPool(EventLoop *baseloop) : _thread_count(0) , _next_idx(0) , _baseloop(baseloop) {} void SetThreadCount(int count) { _thread_count = count; } void Create() { if(_thread_count > 0) { _threads.resize(_thread_count); _loops.resize(_thread_count); for(int i = 0; i < _thread_count; i++) { _threads[i] = new LoopThread(); _loops[i] = _threads[i]->GetLoop(); } } return ; } EventLoop *NextLoop() { if(_thread_count == 0) return _baseloop; _next_idx = (_next_idx + 1) % _thread_count; return _loops[_next_idx]; } private: int _thread_count; int _next_idx; EventLoop *_baseloop; std::vector<LoopThread*> _threads; std::vector<EventLoop*> _loops; };
4、10 TcpServer模塊
? 上述我們就已經(jīng)實(shí)現(xiàn)了高并發(fā)服務(wù)器的所有模塊。這個(gè)模塊就是將上述的所有模塊進(jìn)行了整合,通過(guò) Tcpserver 模塊實(shí)例化的對(duì)象,可以非常簡(jiǎn)單的完成一個(gè)服務(wù)器的搭建。
??Tcpserver 模塊主要管理的對(duì)象:
- Acceptor對(duì)象,創(chuàng)建一個(gè)監(jiān)聽(tīng)套接字!
- EventLoop 對(duì)象,baseloop對(duì)象,實(shí)現(xiàn)對(duì)監(jiān)聽(tīng)套接字的事件監(jiān)控!
- std::vector conns,實(shí)現(xiàn)對(duì)新建連接的管理!
- LoopThreadPool 對(duì)象,創(chuàng)建loop線程池,對(duì)新建連接進(jìn)行事件監(jiān)控和處理!
? 該模塊搭建服務(wù)器的主要流程:
- 在TcpServer中實(shí)例一個(gè)Acceptor對(duì)象,以及一個(gè)EventLoop 對(duì)象(baseloop);
- 將Acceptor 掛在baseloop 進(jìn)行事件監(jiān)控;
- 一旦Acceptor 對(duì)象就緒了可讀事件,則執(zhí)行時(shí)間回調(diào)函數(shù)獲取新建連接;
- 對(duì)新連接,創(chuàng)造一個(gè) Connection 進(jìn)行管理;
- 對(duì)新連接對(duì)應(yīng)的 Connection 設(shè)置功能回調(diào) (連接完成回調(diào),消息回調(diào),關(guān)閉回調(diào),任意事件監(jiān)控?。?;
- 啟動(dòng)Connection 的非活躍鏈接的超時(shí)銷(xiāo)毀功能
- 將新連接對(duì)應(yīng)的Connection 掛到 LoopThreadPool 中的叢書(shū)線程對(duì)應(yīng)的Eventloop 中進(jìn)行事件監(jiān)控!
- 一旦Connection對(duì)應(yīng)的鏈接就緒了可讀事件,則這個(gè)時(shí)候執(zhí)行讀事件回調(diào)函數(shù),讀取數(shù)據(jù),讀取完畢后調(diào)用TcpServer設(shè)置的消息回調(diào)!
? 那我們?cè)趯?shí)現(xiàn)的時(shí)候就可以主要實(shí)現(xiàn)以下功能:
- 設(shè)置從屬線程池?cái)?shù)量;
- 啟動(dòng)服務(wù)器;
- 設(shè)置各種回調(diào)函數(shù)。(連接建立完成,消息,關(guān)閉,任意) 用戶設(shè)置給TcpServer TcpServer設(shè)置獲取的新連接;
- 是否啟動(dòng)非活躍連接超時(shí)銷(xiāo)毀功能;
- 添加任務(wù)。
? 我們看如下實(shí)現(xiàn)代碼:
class TcpServer { using ConnectedCallback = std::function<void(const PtrConnection&)>; using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>; using ClosedCallback = std::function<void(const PtrConnection&)>; using AnyEventCallback = std::function<void(const PtrConnection&)>; using Functor = std::function<void()>; private: void NewConnection(int fd) { _next_id++; PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd)); conn->SetMessageCallback(_message_callback); conn->SetClosedCallback(_closed_callback); conn->SetConnectedCallback(_connected_callback); conn->SetAnyEventCallback(_event_callback); conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1)); if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout); // 啟動(dòng)非活躍超時(shí)銷(xiāo)毀 conn->Established(); // 就緒初始化 _conns.insert(std::make_pair(_next_id, conn)); } void RemoveConnectionInLoop(const PtrConnection &conn) { int id = conn->Id(); auto it = _conns.find(id); if (it != _conns.end()) { _conns.erase(it); } } // 從管理Connection的_conns中移除連接信息 void RemoveConnection(const PtrConnection &conn) { _baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn)); } void RunAfterInLoop(const Functor &task, int delay) { _next_id++; _baseloop.TimerAdd(_next_id, delay, task); } public: TcpServer(int port) : _port(port) , _next_id(0) , _enable_inactive_release(false) , _acceptor(&_baseloop, port) , _pool(&_baseloop) { _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1)); _acceptor.Listen(); } void SetThreadCount(int count) { return _pool.SetThreadCount(count); } void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; } void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; } // 添加一個(gè)定時(shí)任務(wù) void RunAfter(const Functor& task, int delay) { _baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay)); } void Start() { _pool.Create(); _baseloop.Start(); } private: uint64_t _next_id; int _port; int _timeout; bool _enable_inactive_release; EventLoop _baseloop; Acceptor _acceptor; LoopThreadPool _pool; std::unordered_map<uint64_t, PtrConnection> _conns; ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback; };
4、11 測(cè)試代碼
? 有了TcpSerevr模塊,我們就可以很好的搭建出一個(gè)服務(wù)器了。我們只需要設(shè)置服務(wù)器觸發(fā)IO事件后的回調(diào)即可!具體測(cè)試服務(wù)器代碼如下:
#include "../Server.hpp" class EchoServer { private: TcpServer _server; private: void OnConnected(const PtrConnection &conn) { DBG_LOG("NEW CONNECTION:%p", conn.get()); } void OnClosed(const PtrConnection &conn) { DBG_LOG("CLOSE CONNECTION:%p", conn.get()); } void OnMessage(const PtrConnection &conn, Buffer *buf) { conn->Send(buf->ReadPostion(), buf->ReadAbleSize()); buf->MoveReadOffset(buf->ReadAbleSize()); conn->Shutdown(); } public: EchoServer(int port):_server(port) { _server.SetThreadCount(2); _server.EnableInactiveRelease(10); _server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1)); _server.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1)); _server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2)); } void Start() { _server.Start(); } };
五、HTTP協(xié)議支持實(shí)現(xiàn)
??HTTP協(xié)議模塊用于對(duì)高并發(fā)服務(wù)器模塊進(jìn)行協(xié)議支持,基于提供的協(xié)議支持能夠更方便的完成指定協(xié)議服務(wù)器的搭建。而HTTP協(xié)議支持模塊的實(shí)現(xiàn),可以細(xì)分為下述幾個(gè)小節(jié)的模塊。
5、1 Util模塊
??這個(gè)模塊是一個(gè)工具模塊,主要提供HTTP協(xié)議模塊所用到的一些工具函數(shù),比如url編解碼,文件讀寫(xiě)....等。其主要提供的功能如下:
- 讀取文件內(nèi)容;
- 向文件寫(xiě)入內(nèi)容;
- URL編碼;
- URL解碼;
- 通過(guò)HTTP狀態(tài)碼獲取描述信息;
- 通過(guò)文件后綴名獲取mime;
- 判斷一個(gè)文件是不是目錄;
- 判斷一個(gè)文件是否是一個(gè)普通文件;
- HTTP資源路徑有效性判斷;
? 該模塊其中的實(shí)現(xiàn),可以說(shuō)是對(duì)零碎的功能進(jìn)行了整合。具體實(shí)現(xiàn)代碼如下:
std::unordered_map<int, std::string> _statu_msg = { {100, "Continue"}, {101, "Switching Protocol"}, {102, "Processing"}, {103, "Early Hints"}, {200, "OK"}, {201, "Created"}, {202, "Accepted"}, {203, "Non-Authoritative Information"}, {204, "No Content"}, {205, "Reset Content"}, {206, "Partial Content"}, {207, "Multi-Status"}, {208, "Already Reported"}, {226, "IM Used"}, {300, "Multiple Choice"}, {301, "Moved Permanently"}, {302, "Found"}, {303, "See Other"}, {304, "Not Modified"}, {305, "Use Proxy"}, {306, "unused"}, {307, "Temporary Redirect"}, {308, "Permanent Redirect"}, {400, "Bad Request"}, {401, "Unauthorized"}, {402, "Payment Required"}, {403, "Forbidden"}, {404, "Not Found"}, {405, "Method Not Allowed"}, {406, "Not Acceptable"}, {407, "Proxy Authentication Required"}, {408, "Request Timeout"}, {409, "Conflict"}, {410, "Gone"}, {411, "Length Required"}, {412, "Precondition Failed"}, {413, "Payload Too Large"}, {414, "URI Too Long"}, {415, "Unsupported Media Type"}, {416, "Range Not Satisfiable"}, {417, "Expectation Failed"}, {418, "I'm a teapot"}, {421, "Misdirected Request"}, {422, "Unprocessable Entity"}, {423, "Locked"}, {424, "Failed Dependency"}, {425, "Too Early"}, {426, "Upgrade Required"}, {428, "Precondition Required"}, {429, "Too Many Requests"}, {431, "Request Header Fields Too Large"}, {451, "Unavailable For Legal Reasons"}, {501, "Not Implemented"}, {502, "Bad Gateway"}, {503, "Service Unavailable"}, {504, "Gateway Timeout"}, {505, "HTTP Version Not Supported"}, {506, "Variant Also Negotiates"}, {507, "Insufficient Storage"}, {508, "Loop Detected"}, {510, "Not Extended"}, {511, "Network Authentication Required"} }; std::unordered_map<std::string, std::string> _mime_msg = { {".aac", "audio/aac"}, {".abw", "application/x-abiword"}, {".arc", "application/x-freearc"}, {".avi", "video/x-msvideo"}, {".azw", "application/vnd.amazon.ebook"}, {".bin", "application/octet-stream"}, {".bmp", "image/bmp"}, {".bz", "application/x-bzip"}, {".bz2", "application/x-bzip2"}, {".csh", "application/x-csh"}, {".css", "text/css"}, {".csv", "text/csv"}, {".doc", "application/msword"}, {".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"}, {".eot", "application/vnd.ms-fontobject"}, {".epub", "application/epub+zip"}, {".gif", "image/gif"}, {".htm", "text/html"}, {".html", "text/html"}, {".ico", "image/vnd.microsoft.icon"}, {".ics", "text/calendar"}, {".jar", "application/java-archive"}, {".jpeg", "image/jpeg"}, {".jpg", "image/jpeg"}, {".js", "text/javascript"}, {".json", "application/json"}, {".jsonld", "application/ld+json"}, {".mid", "audio/midi"}, {".midi", "audio/x-midi"}, {".mjs", "text/javascript"}, {".mp3", "audio/mpeg"}, {".mpeg", "video/mpeg"}, {".mpkg", "application/vnd.apple.installer+xml"}, {".odp", "application/vnd.oasis.opendocument.presentation"}, {".ods", "application/vnd.oasis.opendocument.spreadsheet"}, {".odt", "application/vnd.oasis.opendocument.text"}, {".oga", "audio/ogg"}, {".ogv", "video/ogg"}, {".ogx", "application/ogg"}, {".otf", "font/otf"}, {".png", "image/png"}, {".pdf", "application/pdf"}, {".ppt", "application/vnd.ms-powerpoint"}, {".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"}, {".rar", "application/x-rar-compressed"}, {".rtf", "application/rtf"}, {".sh", "application/x-sh"}, {".svg", "image/svg+xml"}, {".swf", "application/x-shockwave-flash"}, {".tar", "application/x-tar"}, {".tif", "image/tiff"}, {".tiff", "image/tiff"}, {".ttf", "font/ttf"}, {".txt", "text/plain"}, {".vsd", "application/vnd.visio"}, {".wav", "audio/wav"}, {".weba", "audio/webm"}, {".webm", "video/webm"}, {".webp", "image/webp"}, {".woff", "font/woff"}, {".woff2", "font/woff2"}, {".xhtml", "application/xhtml+xml"}, {".xls", "application/vnd.ms-excel"}, {".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"}, {".xml", "application/xml"}, {".xul", "application/vnd.mozilla.xul+xml"}, {".zip", "application/zip"}, {".3gp", "video/3gpp"}, {".3g2", "video/3gpp2"}, {".7z", "application/x-7z-compressed"} }; class Util { public: static size_t Split(const std::string &src, const std::string &sep, std::vector<std::string>* arry) { size_t offset = 0; while(offset < src.size()) { size_t pos = src.find(sep, offset); if(pos == std::string::npos) { if(pos == src.size()) break; arry->push_back(src.substr(offset)); return arry->size(); } if(pos == offset) { offset = offset + sep.size(); continue; } arry->push_back(src.substr(offset, pos - offset)); offset = pos + sep.size(); } return arry->size(); } static bool ReadFile(const std::string &filename, std::string *buf) { std::ifstream ifs(filename.c_str(), std::ios::binary); if(ifs.is_open() == false) { printf("open %s file failed", filename.c_str()); return false; } size_t fsize = 0; ifs.seekg(0, ifs.end); fsize = ifs.tellg(); ifs.seekg(0, ifs.beg); buf->resize(fsize); ifs.read(&(*buf)[0], fsize); if(ifs.good() == false) { printf("read %s file failed", filename.c_str()); ifs.close(); return false; } ifs.close(); return true; } static bool WriteFile(const std::string &filename, const std::string &buf) { std::ofstream ofs(filename, std::ios::binary | std::ios::trunc); if(ofs.is_open() == false) { printf("open %s file failed", filename.c_str()); return false; } ofs.write(buf.c_str(), buf.size()); if(ofs.good() == false) { printf("write %s file failed", filename.c_str()); ofs.close(); return false; } ofs.close(); return true; } // URL編碼,避免URL中資源路徑與查詢字符串中的特殊字符與HTTP請(qǐng)求中特殊字符產(chǎn)生歧義 // 編碼格式:將特殊字符的ascii值,轉(zhuǎn)換為兩個(gè)16進(jìn)制字符,前綴% C++ -> C%2B%2B // 不編碼的特殊字符: RFC3986文檔規(guī)定 . - _ ~ 字母,數(shù)字屬于絕對(duì)不編碼字符 // RFC3986文檔規(guī)定,編碼格式 %HH // W3C標(biāo)準(zhǔn)中規(guī)定,查詢字符串中的空格,需要編碼為+, 解碼則是+轉(zhuǎn)空格 static std::string UrlEncode(const std::string url, bool convert_space_to_plus) { std::string res; for (auto &c : url) { if (c == '.' || c == '-' || c == '_' || c == '~' || isalnum(c)) { res += c; continue; } if (c == ' ' && convert_space_to_plus == true) { res += '+'; continue; } // 剩下的字符都是需要編碼成為 %HH 格式 char tmp[4] = {0}; // snprintf 與 printf比較類(lèi)似,都是格式化字符串,只不過(guò)一個(gè)是打印,一個(gè)是放到一塊空間中 snprintf(tmp, sizeof tmp, "%%%02X", c); res += tmp; } return res; } static char HEXTOI(char c) { if (c >= '0' && c <= '9') { return c - '0'; } else if (c >= 'a' && c <= 'z') { return c - 'a' + 10; } else if (c >= 'A' && c <= 'Z') { return c - 'A' + 10; } return -1; } static std::string UrlDecode(const std::string url, bool convert_plus_to_space) { // 遇到了%,則將緊隨其后的2個(gè)字符,轉(zhuǎn)換為數(shù)字,第一個(gè)數(shù)字左移4位,然后加上第二個(gè)數(shù)字 + -> 2b %2b->2 << 4 + 11 std::string res; for (int i = 0; i < url.size(); i++) { if (url[i] == '+' && convert_plus_to_space == true) { res += ' '; continue; } if (url[i] == '%' && (i + 2) < url.size()) { char v1 = HEXTOI(url[i + 1]); char v2 = HEXTOI(url[i + 2]); char v = v1 * 16 + v2; res += v; i += 2; continue; } res += url[i]; } return res; } // 響應(yīng)狀態(tài)碼的描述信息獲取 static std::string StatuDesc(int statu) { auto it = _statu_msg.find(statu); if (it != _statu_msg.end()) { return it->second; } return "Unknow"; } // 根據(jù)文件后綴名獲取文件mime static std::string ExtMime(const std::string &filename) { // a.b.txt 先獲取文件擴(kuò)展名 size_t pos = filename.find_last_of('.'); if (pos == std::string::npos) { return "application/octet-stream"; } // 根據(jù)擴(kuò)展名,獲取mime std::string ext = filename.substr(pos); auto it = _mime_msg.find(ext); if (it == _mime_msg.end()) { return "application/octet-stream"; } return it->second; } static bool IsDirectory(const std::string &filename) { struct stat st; int ret = stat(filename.c_str(), &st); if(ret < 0) { return false; } return S_ISDIR(st.st_mode); } static bool IsRegular(const std::string &filename) { struct stat st; int ret = stat(filename.c_str(), &st); if(ret < 0) { return false; } return S_ISREG(st.st_mode); } // http請(qǐng)求的資源路徑有效性判斷 // /index.html --- 前邊的/叫做相對(duì)根目錄 映射的是某個(gè)服務(wù)器上的子目錄 // 想表達(dá)的意思就是,客戶端只能請(qǐng)求相對(duì)根目錄中的資源,其他地方的資源都不予理會(huì) // /../login, 這個(gè)路徑中的..會(huì)讓路徑的查找跑到相對(duì)根目錄之外,這是不合理的,不安全的 static bool ValidPath(const std::string &path) { // 思想:按照/進(jìn)行路徑分割,根據(jù)有多少子目錄,計(jì)算目錄深度,有多少層,深度不能小于0 std::vector<std::string> subdir; Split(path, "/", &subdir); int level = 0; for (auto &dir : subdir) { if (dir == "..") { level--; // 任意一層走出相對(duì)根目錄,就認(rèn)為有問(wèn)題 if (level < 0) return false; continue; } level++; } return true; } };
? 注意,這里的狀態(tài)碼和對(duì)應(yīng)的文件名后綴名獲取mime都是固定的一一對(duì)應(yīng)的。我們只需要用一個(gè)hash表將他們存儲(chǔ)起來(lái),然后又來(lái)狀態(tài)碼或者文件后綴名去對(duì)應(yīng)的表中查找即可。
? 對(duì)URL的編碼和解碼都是有固定的,編碼格式:
- URL編碼,避免URL中資源路徑與查詢字符串中的特殊字符與HTTP請(qǐng)求中特殊字符產(chǎn)生歧義
- 編碼格式:將特殊字符的ascii值,轉(zhuǎn)換為兩個(gè)16進(jìn)制字符,前綴% ? C++ -> C%2B%2B
- 不編碼的特殊字符: RFC3986文檔規(guī)定 . - _ ~ 字母,數(shù)字屬于絕對(duì)不編碼字符;
- RFC3986文檔規(guī)定,編碼格式 %HH;
- W3C標(biāo)準(zhǔn)中規(guī)定,查詢字符串中的空格,需要編碼為+, 解碼則是+轉(zhuǎn)空格;
? 在解碼的時(shí)候遇到了%,則將緊隨其后的2個(gè)字符,轉(zhuǎn)換為數(shù)字,第一個(gè)數(shù)字左移4位,然后加上第二個(gè)數(shù)字。例如:%2b->2 << 4 + 11。
? 為什么還要判斷Http請(qǐng)求資源有效性呢?例如:/index.html ,前邊的 / 叫做相對(duì)根目錄,映射的是某個(gè)服務(wù)器上的子目錄??蛻舳酥荒苷?qǐng)求相對(duì)根目錄中的資源,其他地方的資源都不予理會(huì)。例如這種情況:?/../login, 這個(gè)路徑中的..會(huì)讓路徑的查找跑到相對(duì)根目錄之外,這是不合理的,不安全的。
5、2 HttpRequest模塊
??這個(gè)模塊是HTTP請(qǐng)求數(shù)據(jù)模塊,用于保存HTTP請(qǐng)求數(shù)據(jù)被解析后的各項(xiàng)請(qǐng)求元素信息。HTTP的請(qǐng)求格式我們就不再說(shuō)明,不懂的同學(xué)可以去搜索一下。該模塊就是用來(lái)接收到一個(gè)數(shù)據(jù),按照HTTP請(qǐng)求格式進(jìn)行解析,得到各個(gè)關(guān)鍵要素放到Request中,讓HTTP請(qǐng)求的分析更加簡(jiǎn)單。我們直接看代碼:
class HttpRequest { public: std::string _method; // 請(qǐng)求方法 std::string _path; // 資源路徑 std::string _version; // 協(xié)議版本 std::string _body; // 請(qǐng)求正文 std::smatch _matches; // 資源路徑的正則提取數(shù)據(jù) std::unordered_map<std::string, std::string> _headers; // 頭部字段 std::unordered_map<std::string, std::string> _params; // 查詢字符串 public: HttpRequest() : _version("HTTP/1.1") {} void ReSet() { _method.clear(); _path.clear(); _version = "HTTP/1.1"; _body.clear(); std::smatch match; _matches.swap(match); _headers.clear(); _params.clear(); } // 插入頭部字段 void SetHeader(const std::string &key, const std::string &val) { _headers.insert(std::make_pair(key, val)); } // 判斷是否存在指定頭部字段 bool HasHeader(const std::string &key) const { auto it = _headers.find(key); if (it == _headers.end()) { return false; } return true; } // 獲取指定頭部字段的值 std::string GetHeader(const std::string &key) const { auto it = _headers.find(key); if (it == _headers.end()) { return ""; } return it->second; } // 插入查詢字符串 void SetParam(const std::string &key, const std::string &val) { _params.insert(std::make_pair(key, val)); } // 判斷是否有某個(gè)指定的查詢字符串 bool HasParam(const std::string &key) const { auto it = _params.find(key); if (it == _params.end()) { return false; } return true; } // 獲取指定的查詢字符串 std::string GetParam(const std::string &key) const { auto it = _params.find(key); if (it == _params.end()) { return ""; } return it->second; } // 獲取正文長(zhǎng)度 size_t ContentLength() const { // Content-Length: 1234\r\n bool ret = HasHeader("Content-Length"); if (ret == false) { return 0; } std::string clen = GetHeader("Content-Length"); return std::stol(clen); } // 判斷是否是短鏈接 bool Close() const { // 沒(méi)有Connection字段,或者有Connection但是值是close,則都是短鏈接,否則就是長(zhǎng)連接 if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") { return false; } return true; } };
5、3 HttpResponse模塊
? 當(dāng)我們對(duì)Http請(qǐng)求進(jìn)行處理后,還要對(duì)客戶端進(jìn)行響應(yīng)。該模塊就是讓使用者向Response中填充響應(yīng)要素,完畢后將其組織成HTTP響應(yīng)格式的數(shù)據(jù),發(fā)給客戶端。Http的相應(yīng)格式就不再過(guò)多解釋?zhuān)覀冎苯涌磳?shí)現(xiàn)代碼:
class HttpResponse { public: int _statu; bool _redirect_flag; std::string _body; std::string _redirect_url; std::unordered_map<std::string, std::string> _headers; public: HttpResponse() : _redirect_flag(false) , _statu(200) {} HttpResponse(int statu) : _redirect_flag(false) , _statu(statu) {} void ReSet() { _statu = 200; _redirect_flag = false; _body.clear(); _redirect_url.clear(); _headers.clear(); } // 插入頭部字段 void SetHeader(const std::string &key, const std::string &val) { _headers.insert(std::make_pair(key, val)); } // 判斷是否存在指定頭部字段 bool HasHeader(const std::string &key) { auto it = _headers.find(key); if (it == _headers.end()) { return false; } return true; } // 獲取指定頭部字段的值 std::string GetHeader(const std::string &key) { auto it = _headers.find(key); if (it == _headers.end()) { return ""; } return it->second; } void SetContent(const std::string &body, const std::string &type = "text/html") { _body = body; SetHeader("Content-Type", type); } void SetRedirect(const std::string &url, int statu = 302) { _statu = statu; _redirect_flag = true; _redirect_url = url; } // 判斷是否是短鏈接 bool Close() { // 沒(méi)有Connection字段,或者有Connection但是值是close,則都是短鏈接,否則就是長(zhǎng)連接 if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") { return false; } return true; } };
5、4 HttpContext模塊
??這個(gè)模塊是一個(gè)HTTP請(qǐng)求接收的上下文模塊,主要是為了防止在一次接收的數(shù)據(jù)中,不是一個(gè)完整的HTTP請(qǐng)求,則解析過(guò)程并未完成,無(wú)法進(jìn)行完整的請(qǐng)求處理,需要在下次接收到新數(shù)據(jù)后繼續(xù)根據(jù)上下文進(jìn)行解析,最終得到一個(gè)HttpRequest請(qǐng)求信息對(duì)象,因此在請(qǐng)求數(shù)據(jù)的接收以及解析部分需要一個(gè)上下文來(lái)進(jìn)行控制接收和處理節(jié)奏。
? 我們還對(duì)處于接收還是響應(yīng)狀態(tài)進(jìn)行了不同的設(shè)置。
? 接收狀態(tài):
- 當(dāng)前處理接受并且處理請(qǐng)求行的階段——接受請(qǐng)求行;
- 表示接收頭部的接收還沒(méi)處理完畢——接受請(qǐng)求頭部;
- 表示正文還沒(méi)有接受完畢——接受正文;
- 這是一個(gè)可以對(duì)數(shù)據(jù)請(qǐng)求處理的階段——接受數(shù)據(jù)處理完畢;
- 接受處理請(qǐng)求出錯(cuò)。
? 響應(yīng)狀態(tài):
- 在請(qǐng)求的接受并且處理的過(guò)程中,有可能會(huì)出現(xiàn)各種不同的問(wèn)題,解析出錯(cuò),訪問(wèn)的資源不對(duì),沒(méi)有權(quán)限等等。而這些錯(cuò)誤的響應(yīng)狀態(tài)碼都是不一樣的。
- 當(dāng)處理完畢,狀態(tài)就變成了已經(jīng)接受并且處理請(qǐng)求信息。
? 實(shí)現(xiàn)起來(lái)只要跟著我們的狀態(tài)變化的思路一步一步實(shí)現(xiàn)即可。具體實(shí)現(xiàn)接口如下:
- 接受請(qǐng)求行;
- 解析請(qǐng)求行;
- 接收頭部;
- 解析頭部;
- 接受正文;
- 返回解析完成的請(qǐng)求信息。
? 我們來(lái)看具體實(shí)現(xiàn)代碼:
#define MAX_LINE 8192 class HttpContext { private: int _resp_statu; // 響應(yīng)狀態(tài)碼 HttpRecvStatu _recv_statu; // 當(dāng)前接收及解析的階段狀態(tài) HttpRequest _request; // 已經(jīng)解析得到的請(qǐng)求信息 private: bool ParseHttpLine(const std::string &line) { std::smatch matches; std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?", std::regex::icase); bool ret = std::regex_match(line, matches, e); if (ret == false) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; // BAD REQUEST return false; } // 0 : GET /bitejiuyeke/login?user=xiaoming&pass=123123 HTTP/1.1 // 1 : GET // 2 : /bitejiuyeke/login // 3 : user=xiaoming&pass=123123 // 4 : HTTP/1.1 // 請(qǐng)求方法的獲取 _request._method = matches[1]; std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper); // 資源路徑的獲取,需要進(jìn)行URL解碼操作,但是不需要+轉(zhuǎn)空格 _request._path = Util::UrlDecode(matches[2], false); // 協(xié)議版本的獲取 _request._version = matches[4]; // 查詢字符串的獲取與處理 std::vector<std::string> query_string_arry; std::string query_string = matches[3]; // 查詢字符串的格式 key=val&key=val....., 先以 & 符號(hào)進(jìn)行分割,得到各個(gè)字串 Util::Split(query_string, "&", &query_string_arry); // 針對(duì)各個(gè)字串,以 = 符號(hào)進(jìn)行分割,得到key 和val, 得到之后也需要進(jìn)行URL解碼 for (auto &str : query_string_arry) { size_t pos = str.find("="); if (pos == std::string::npos) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; // BAD REQUEST return false; } std::string key = Util::UrlDecode(str.substr(0, pos), true); std::string val = Util::UrlDecode(str.substr(pos + 1), true); _request.SetParam(key, val); } return true; } bool RecvHttpLine(Buffer *buf) { if (_recv_statu != RECV_HTTP_LINE) return false; // 1. 獲取一行數(shù)據(jù),帶有末尾的換行 std::string line = buf->GetLineAndPop(); // 2. 需要考慮的一些要素:緩沖區(qū)中的數(shù)據(jù)不足一行, 獲取的一行數(shù)據(jù)超大 if (line.size() == 0) { // 緩沖區(qū)中的數(shù)據(jù)不足一行,則需要判斷緩沖區(qū)的可讀數(shù)據(jù)長(zhǎng)度,如果很長(zhǎng)了都不足一行,這是有問(wèn)題的 if (buf->ReadAbleSize() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } // 緩沖區(qū)中數(shù)據(jù)不足一行,但是也不多,就等等新數(shù)據(jù)的到來(lái) return true; } if (line.size() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } bool ret = ParseHttpLine(line); if (ret == false) { return false; } // 首行處理完畢,進(jìn)入頭部獲取階段 _recv_statu = RECV_HTTP_HEAD; return true; } bool RecvHttpHead(Buffer *buf) { if (_recv_statu != RECV_HTTP_HEAD) return false; // 一行一行取出數(shù)據(jù),直到遇到空行為止, 頭部的格式 key: val\r\nkey: val\r\n.... while (1) { std::string line = buf->GetLineAndPop(); // 2. 需要考慮的一些要素:緩沖區(qū)中的數(shù)據(jù)不足一行, 獲取的一行數(shù)據(jù)超大 if (line.size() == 0) { // 緩沖區(qū)中的數(shù)據(jù)不足一行,則需要判斷緩沖區(qū)的可讀數(shù)據(jù)長(zhǎng)度,如果很長(zhǎng)了都不足一行,這是有問(wèn)題的 if (buf->ReadAbleSize() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } // 緩沖區(qū)中數(shù)據(jù)不足一行,但是也不多,就等等新數(shù)據(jù)的到來(lái) return true; } if (line.size() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } if (line == "\n" || line == "\r\n") { break; } bool ret = ParseHttpHead(line); if (ret == false) { return false; } } // 頭部處理完畢,進(jìn)入正文獲取階段 _recv_statu = RECV_HTTP_BODY; return true; } bool ParseHttpHead(std::string &line) { // key: val\r\nkey: val\r\n.... if (line.back() == '\n') line.pop_back(); // 末尾是換行則去掉換行字符 if (line.back() == '\r') line.pop_back(); // 末尾是回車(chē)則去掉回車(chē)字符 size_t pos = line.find(": "); if (pos == std::string::npos) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; // return false; } std::string key = line.substr(0, pos); std::string val = line.substr(pos + 2); _request.SetHeader(key, val); return true; } bool RecvHttpBody(Buffer *buf) { if (_recv_statu != RECV_HTTP_BODY) return false; // 1. 獲取正文長(zhǎng)度 size_t content_length = _request.ContentLength(); if (content_length == 0) { // 沒(méi)有正文,則請(qǐng)求接收解析完畢 _recv_statu = RECV_HTTP_OVER; return true; } // 2. 當(dāng)前已經(jīng)接收了多少正文,其實(shí)就是往 _request._body 中放了多少數(shù)據(jù)了 size_t real_len = content_length - _request._body.size(); // 實(shí)際還需要接收的正文長(zhǎng)度 // 3. 接收正文放到body中,但是也要考慮當(dāng)前緩沖區(qū)中的數(shù)據(jù),是否是全部的正文 // 3.1 緩沖區(qū)中數(shù)據(jù),包含了當(dāng)前請(qǐng)求的所有正文,則取出所需的數(shù)據(jù) if (buf->ReadAbleSize() >= real_len) { _request._body.append(buf->ReadPostion(), real_len); buf->MoveReadOffset(real_len); _recv_statu = RECV_HTTP_OVER; return true; } // 3.2 緩沖區(qū)中數(shù)據(jù),無(wú)法滿足當(dāng)前正文的需要,數(shù)據(jù)不足,取出數(shù)據(jù),然后等待新數(shù)據(jù)到來(lái) _request._body.append(buf->ReadPostion(), buf->ReadAbleSize()); buf->MoveReadOffset(buf->ReadAbleSize()); return true; } public: HttpContext() : _resp_statu(200), _recv_statu(RECV_HTTP_LINE) {} void ReSet() { _resp_statu = 200; _recv_statu = RECV_HTTP_LINE; _request.ReSet(); } int RespStatu() { return _resp_statu; } HttpRecvStatu RecvStatu() { return _recv_statu; } HttpRequest &Request() { return _request; } // 接收并解析HTTP請(qǐng)求 void RecvHttpRequest(Buffer *buf) { // 不同的狀態(tài),做不同的事情,但是這里不要break, 因?yàn)樘幚硗暾?qǐng)求行后,應(yīng)該立即處理頭部,而不是退出等新數(shù)據(jù) switch (_recv_statu) { case RECV_HTTP_LINE: RecvHttpLine(buf); case RECV_HTTP_HEAD: RecvHttpHead(buf); case RECV_HTTP_BODY: RecvHttpBody(buf); } return; } };
? 下面對(duì)上述的主要成員函數(shù)的作用進(jìn)行簡(jiǎn)單講解一下:
- ParseHttpLine(const std::string &line): 解析HTTP請(qǐng)求行,根據(jù)正則表達(dá)式提取請(qǐng)求方法、資源路徑、查詢參數(shù)和協(xié)議版本,并進(jìn)行URL解碼操作。
- RecvHttpLine(Buffer *buf): 接收并解析HTTP請(qǐng)求行,從緩沖區(qū)中獲取一行數(shù)據(jù),判斷緩沖區(qū)數(shù)據(jù)是否足夠一行,如果不足則等待新數(shù)據(jù),如果超過(guò)最大行限制則返回錯(cuò)誤狀態(tài)碼,否則調(diào)用ParseHttpLine()函數(shù)解析請(qǐng)求行。
- ParseHttpHead(std::string &line): 解析HTTP請(qǐng)求頭部,根據(jù)鍵值對(duì)格式提取鍵和值,并保存到請(qǐng)求頭部對(duì)象中。
- RecvHttpHead(Buffer *buf): 接收并解析HTTP請(qǐng)求頭部,從緩沖區(qū)中逐行獲取數(shù)據(jù),直到遇到空行為止,每行都調(diào)用ParseHttpHead()函數(shù)解析并保存到請(qǐng)求頭部對(duì)象中。
- RecvHttpBody(Buffer *buf): 接收并處理HTTP請(qǐng)求正文,根據(jù)Content-Length頭部字段獲取正文長(zhǎng)度,然后從緩沖區(qū)中讀取對(duì)應(yīng)長(zhǎng)度的數(shù)據(jù)保存到請(qǐng)求正文對(duì)象中。
5、5 HttpServer模塊
這個(gè)模塊是最終給組件使用者提供的HTTP服務(wù)器模塊了,用于以簡(jiǎn)單的接口實(shí)現(xiàn)HTTP服務(wù)器的搭建。HttpServer模塊內(nèi)部包含有一個(gè)TcpServer對(duì)象:TcpServer對(duì)象實(shí)現(xiàn)服務(wù)器的搭建。HttpServer模塊內(nèi)部包含有兩個(gè)提供給TcpServer對(duì)象的接口∶連接建立成功設(shè)置上下文接口,數(shù)據(jù)處理接口。HttpServer模塊內(nèi)部包含有一個(gè)hash-map表存儲(chǔ)請(qǐng)求與處理函數(shù)的映射表,這個(gè)表由組件使用者向HttpServer設(shè)置哪些請(qǐng)求應(yīng)該使用哪些函數(shù)進(jìn)行處理,等TcpServer收到對(duì)應(yīng)的請(qǐng)求就會(huì)使用對(duì)應(yīng)的區(qū)數(shù)進(jìn)行處理。
我們?cè)賮?lái)看一下請(qǐng)求路由表:
表中記錄了針對(duì)哪個(gè)請(qǐng)求,應(yīng)該使用哪個(gè)函數(shù)來(lái)進(jìn)行業(yè)務(wù)處理的映射關(guān)系。當(dāng)服務(wù)器收到了一個(gè)請(qǐng)求,就在請(qǐng)求路由表中,查找有沒(méi)有對(duì)應(yīng)請(qǐng)求的處理函數(shù),如果有,則執(zhí)行對(duì)應(yīng)的處理函數(shù)即可。說(shuō)白了,什么請(qǐng)求,怎么處理,由用戶來(lái)設(shè)定,服務(wù)器收到了請(qǐng)求只需要執(zhí)行函數(shù)即可。這樣做的好處:用戶只需要實(shí)現(xiàn)業(yè)務(wù)處理函數(shù),然后將請(qǐng)求與處理函數(shù)的映射關(guān)系,添加到服務(wù)器中。而服務(wù)器只需要接收數(shù)據(jù),解析數(shù)據(jù),查找路由表映射關(guān)系,執(zhí)行業(yè)務(wù)處理函數(shù)。說(shuō)白了就是用戶只需要啟動(dòng)服務(wù)器,把請(qǐng)求所需要執(zhí)行的方法告訴服務(wù)器即可。
我們?cè)賮?lái)看一下要實(shí)現(xiàn)簡(jiǎn)便的搭建HTTP服務(wù)器,所需要的要素和提供的功能和要素。
所需要蘇:
- GET請(qǐng)求的路由映射表;
- POST請(qǐng)求的路由映射表;
- PUT請(qǐng)求的路由映射表;
- DELETE請(qǐng)求的路由映射表 —— 路由映射表記錄對(duì)應(yīng)請(qǐng)求方法的請(qǐng)求的處理函數(shù)映射關(guān)系;
- 高性能TCP服務(wù)器—— 進(jìn)行連接的IO操作;
- 靜態(tài)資源相對(duì)根目錄 —— 實(shí)現(xiàn)靜態(tài)資源的處理。
服務(wù)器的處理流程:
- 從socket接受數(shù)據(jù)放到接受緩沖區(qū);
- 調(diào)用OnMessage回調(diào)函數(shù)進(jìn)行業(yè)務(wù)處理;
- 對(duì)請(qǐng)求進(jìn)行解析,得到了一個(gè)HTTPREQUEST結(jié)構(gòu),包含了所有的請(qǐng)求要素!
- 進(jìn)行請(qǐng)求的路由映射 —— 找到對(duì)應(yīng)請(qǐng)求的處理方法
- 靜態(tài)資源請(qǐng)求 —— 一些實(shí)體文件資源的請(qǐng)求 html,image,將靜態(tài)資源文件的數(shù)據(jù)讀取出來(lái),填充到HTTPresponse結(jié)構(gòu)中
- 功能性請(qǐng)求 —— 在請(qǐng)求路由映射表中查找處理函數(shù),找到了則執(zhí)行函數(shù),具體的業(yè)務(wù)請(qǐng)求,并進(jìn)行HTTPREsponse結(jié)構(gòu)的數(shù)據(jù)填充
- 對(duì)靜態(tài)資源請(qǐng)求——功能性請(qǐng)求處理完畢后,得到一個(gè)填充了相應(yīng)信息的httpResponse 的對(duì)象,組織http響應(yīng)格式進(jìn)行發(fā)送!
所需接口如下:
- 添加請(qǐng)求-處理函數(shù)映射信息(GET/POST/PUT/DELETE);
- 設(shè)置靜態(tài)資源根目錄;
- 設(shè)置是否啟動(dòng)超時(shí)連接關(guān)閉;
- 設(shè)置線程池中線程數(shù)量;
- 啟動(dòng)服務(wù)器;
- OnConnected ---用于給TcpServer設(shè)置協(xié)議上下文;
- OnMessage -----用于進(jìn)行緩沖區(qū)數(shù)據(jù)解析處理;
- 請(qǐng)求的路由查找(靜態(tài)資源請(qǐng)求查找和處理功能性請(qǐng)求的查找和處理);
- 組織響應(yīng)進(jìn)行回復(fù)。
#define DEFAULT_TIMEOUT 30 class HttpServer { using Handler = std::function<void(const HttpRequest &, HttpResponse *)>; using Handlers = std::vector<std::pair<std::regex, Handler>>; private: void ErrorHandler(const HttpRequest &req, HttpResponse* rsp) { // 組織一個(gè)錯(cuò)誤展示頁(yè)面 std::string body; body += "<html>"; body += "<head>"; body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>"; body += "</head>"; body += "<body>"; body += "<h1>"; body += std::to_string(rsp->_statu); body += " "; body += Util::StatuDesc(rsp->_statu); body += "</h1>"; body += "</body>"; body += "</html>"; // 2. 將頁(yè)面數(shù)據(jù),當(dāng)作響應(yīng)正文,放入rsp中 rsp->SetContent(body, "text/html"); } //將HttpResponse中的要素按照http協(xié)議格式進(jìn)行組織,發(fā)送 void WriteReponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp) { if(req.Close() == true) { rsp.SetHeader("Connection", "close"); } else { rsp.SetHeader("Connection", "keep-alive"); } if(rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false) { rsp.SetHeader("Content-Length", std::to_string(rsp._body.size())); } if(rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false) { rsp.SetHeader("Content-Type", "application/octet-stream"); } if(rsp._redirect_flag == true) { rsp.SetHeader("Location", rsp._redirect_url); } std::stringstream rsp_str; rsp_str << req._version << " " << std::to_string(rsp._statu) << " " << Util::StatuDesc(rsp._statu); for(auto &head : rsp._headers) { rsp_str << head.first << ": " << head.second << "\r\n"; } rsp_str << "\r\n"; rsp_str << rsp._body; conn->Send(rsp_str.str().c_str(), rsp_str.str().size()); } bool IsFileHandler(const HttpRequest& req) { // 1. 必須設(shè)置了靜態(tài)資源根目錄 if (_basedir.empty()) { return false; } // 2. 請(qǐng)求方法,必須是GET / HEAD請(qǐng)求方法 if (req._method != "GET" && req._method != "HEAD") { return false; } // 3. 請(qǐng)求的資源路徑必須是一個(gè)合法路徑 if (Util::ValidPath(req._path) == false) { return false; } // 4. 請(qǐng)求的資源必須存在,且是一個(gè)普通文件 // 有一種請(qǐng)求比較特殊 -- 目錄:/, /image/, 這種情況給后邊默認(rèn)追加一個(gè) index.html // index.html /image/a.png // 不要忘了前綴的相對(duì)根目錄,也就是將請(qǐng)求路徑轉(zhuǎn)換為實(shí)際存在的路徑 /image/a.png -> ./wwwroot/image/a.png std::string req_path = _basedir + req._path; // 為了避免直接修改請(qǐng)求的資源路徑,因此定義一個(gè)臨時(shí)對(duì)象 if (req._path.back() == '/') { req_path += "index.html"; } if (Util::IsRegular(req_path) == false) { return false; } return true; } void FileHandler(const HttpRequest &req, HttpResponse *rsp) { std::string req_path = _basedir + req._path; if (req._path.back() == '/') { req_path += "index.html"; } bool ret = Util::ReadFile(req_path, &rsp->_body); if (ret == false) { return; } std::string mime = Util::ExtMime(req_path); rsp->SetHeader("Content-Type", mime); return; } // 功能性請(qǐng)求的分類(lèi)處理 void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers) { // 在對(duì)應(yīng)請(qǐng)求方法的路由表中,查找是否含有對(duì)應(yīng)資源請(qǐng)求的處理函數(shù),有則調(diào)用,沒(méi)有則發(fā)揮404 // 思想:路由表存儲(chǔ)的時(shí)鍵值對(duì) -- 正則表達(dá)式 & 處理函數(shù) // 使用正則表達(dá)式,對(duì)請(qǐng)求的資源路徑進(jìn)行正則匹配,匹配成功就使用對(duì)應(yīng)函數(shù)進(jìn)行處理 // /numbers/(\d+) /numbers/12345 for (auto &handler : handlers) { const std::regex &re = handler.first; const Handler &functor = handler.second; bool ret = std::regex_match(req._path, req._matches, re); if (ret == false) { continue; } return functor(req, rsp); // 傳入請(qǐng)求信息,和空的rsp,執(zhí)行處理函數(shù) } rsp->_statu = 404; } void Route(HttpRequest &req, HttpResponse *rsp) { // 1. 對(duì)請(qǐng)求進(jìn)行分辨,是一個(gè)靜態(tài)資源請(qǐng)求,還是一個(gè)功能性請(qǐng)求 // 靜態(tài)資源請(qǐng)求,則進(jìn)行靜態(tài)資源的處理 // 功能性請(qǐng)求,則需要通過(guò)幾個(gè)請(qǐng)求路由表來(lái)確定是否有處理函數(shù) // 既不是靜態(tài)資源請(qǐng)求,也沒(méi)有設(shè)置對(duì)應(yīng)的功能性請(qǐng)求處理函數(shù),就返回405 if (IsFileHandler(req) == true) { // 是一個(gè)靜態(tài)資源請(qǐng)求, 則進(jìn)行靜態(tài)資源請(qǐng)求的處理 return FileHandler(req, rsp); } if (req._method == "GET" || req._method == "HEAD") { return Dispatcher(req, rsp, _get_route); } else if (req._method == "POST") { return Dispatcher(req, rsp, _post_route); } else if (req._method == "PUT") { return Dispatcher(req, rsp, _put_route); } else if (req._method == "DELETE") { return Dispatcher(req, rsp, _delete_route); } rsp->_statu = 405; // Method Not Allowed return; } // 設(shè)置上下文 void OnConnected(const PtrConnection &conn) { conn->SetContext(HttpContext()); DBG_LOG("NEW CONNECTION %p", conn.get()); } // 緩沖區(qū)數(shù)據(jù)解析+處理 void OnMessage(const PtrConnection &conn, Buffer *buffer) { while (buffer->ReadAbleSize() > 0) { // 1. 獲取上下文 HttpContext *context = conn->GetContext()->get<HttpContext>(); // 2. 通過(guò)上下文對(duì)緩沖區(qū)數(shù)據(jù)進(jìn)行解析,得到HttpRequest對(duì)象 // 1. 如果緩沖區(qū)的數(shù)據(jù)解析出錯(cuò),就直接回復(fù)出錯(cuò)響應(yīng) // 2. 如果解析正常,且請(qǐng)求已經(jīng)獲取完畢,才開(kāi)始去進(jìn)行處理 context->RecvHttpRequest(buffer); HttpRequest &req = context->Request(); HttpResponse rsp(context->RespStatu()); if (context->RespStatu() >= 400) { // 進(jìn)行錯(cuò)誤響應(yīng),關(guān)閉連接 ErrorHandler(req, &rsp); // 填充一個(gè)錯(cuò)誤顯示頁(yè)面數(shù)據(jù)到rsp中 WriteReponse(conn, req, rsp); // 組織響應(yīng)發(fā)送給客戶端 context->ReSet(); buffer->MoveReadOffset(buffer->ReadAbleSize()); // 出錯(cuò)了就把緩沖區(qū)數(shù)據(jù)清空 conn->Shutdown(); // 關(guān)閉連接 return; } if (context->RecvStatu() != RECV_HTTP_OVER) { // 當(dāng)前請(qǐng)求還沒(méi)有接收完整,則退出,等新數(shù)據(jù)到來(lái)再重新繼續(xù)處理 return; } // 3. 請(qǐng)求路由 + 業(yè)務(wù)處理 Route(req, &rsp); // 4. 對(duì)HttpResponse進(jìn)行組織發(fā)送 WriteReponse(conn, req, rsp); // 5. 重置上下文 context->ReSet(); // 6. 根據(jù)長(zhǎng)短連接判斷是否關(guān)閉連接或者繼續(xù)處理 if (rsp.Close() == true) conn->Shutdown(); // 短鏈接則直接關(guān)閉 } return; } public: HttpServer(int port, int timeout = DEFAULT_TIMEOUT) : _server(port) { _server.EnableInactiveRelease(timeout); _server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1)); _server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2)); } void SetBaseDir(const std::string &path) { assert(Util::IsDirectory(path) == true); _basedir = path; } /*設(shè)置/添加,請(qǐng)求(請(qǐng)求的正則表達(dá))與處理函數(shù)的映射關(guān)系*/ void Get(const std::string &pattern, const Handler &handler) { _get_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Post(const std::string &pattern, const Handler &handler) { _post_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Put(const std::string &pattern, const Handler &handler) { _put_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Delete(const std::string &pattern, const Handler &handler) { _delete_route.push_back(std::make_pair(std::regex(pattern), handler)); } void SetThreadCount(int count) { _server.SetThreadCount(count); } void Listen() { _server.Start(); } private: Handlers _get_route; Handlers _post_route; Handlers _put_route; Handlers _delete_route; std::string _basedir; TcpServer _server; };
六、對(duì)服務(wù)器進(jìn)行測(cè)試
6、1 長(zhǎng)連接測(cè)試
? 我們知道一個(gè)長(zhǎng)連接,當(dāng)請(qǐng)求完一次資源后,并不會(huì)直接斷開(kāi)連接,而是仍然可以向服務(wù)器請(qǐng)求資源。短連接則就是請(qǐng)求一次資源后直接斷開(kāi)連接。對(duì)長(zhǎng)連接測(cè)試思路:一個(gè)連接中每隔3s向服務(wù)器發(fā)送一個(gè)請(qǐng)求,查看是否會(huì)收到響應(yīng),同時(shí)直到超過(guò)超時(shí)時(shí)間看看是否正常。測(cè)試代碼如下:
int main() { Socket cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); /*長(zhǎng)連接測(cè)試:創(chuàng)建一個(gè)客戶端持續(xù)給服務(wù)器發(fā)送數(shù)據(jù),直到超過(guò)超時(shí)時(shí)間看看是否正常*/ std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DBG_LOG("[%s]", buf); sleep(3); } cli_sock.Close(); return 0; }
? 超時(shí)時(shí)間是10s,如下圖:
6、2 不完整報(bào)文請(qǐng)求
? 我們知道,再給服務(wù)器發(fā)送數(shù)據(jù)時(shí),都會(huì)攜帶一個(gè)Content-length屬性,表示有效數(shù)據(jù)的長(zhǎng)度。當(dāng)時(shí)我們現(xiàn)在給服務(wù)器發(fā)送一個(gè)數(shù)據(jù),告訴服務(wù)器要發(fā)送1024字節(jié)的數(shù)據(jù),但是實(shí)際發(fā)送的數(shù)據(jù)不足1024,查看服務(wù)器處理結(jié)果。其實(shí)我們也能想出來(lái)結(jié)果:
如果數(shù)據(jù)只發(fā)送一次,服務(wù)器將得不到完整請(qǐng)求,就不會(huì)進(jìn)行業(yè)務(wù)處理,客戶端也就得不到響應(yīng),最終超時(shí)關(guān)閉連接。
連著給服務(wù)器發(fā)送了多次小的請(qǐng)求,服務(wù)器會(huì)將后邊的請(qǐng)求當(dāng)作前邊請(qǐng)求的正文進(jìn)行處理,而后便處理的時(shí)候有可能就會(huì)因?yàn)樘幚礤e(cuò)誤而關(guān)閉連接。
? 測(cè)試代碼如下:
int main() { Socket cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 100\r\n\r\nGgggggtm"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); // assert(cli_sock.Send(req.c_str(), req.size()) != -1); // assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DBG_LOG("[%s]", buf); sleep(3); } cli_sock.Close(); return 0; }
? 這里結(jié)果就不再給大家展示,大家可自行測(cè)試。
6、3?業(yè)務(wù)處理超時(shí)測(cè)試
? ?接收請(qǐng)求的數(shù)據(jù),但是業(yè)務(wù)處理的時(shí)間過(guò)長(zhǎng),超過(guò)了設(shè)置的超時(shí)銷(xiāo)毀時(shí)間(服務(wù)器性能達(dá)到瓶頸),觀察服務(wù)端的處理。預(yù)期結(jié)果:在一次業(yè)務(wù)處理中耗費(fèi)太長(zhǎng)時(shí)間,導(dǎo)致其他連接被連累超時(shí),導(dǎo)致其他的連接有可能會(huì)超時(shí)釋放。
??假設(shè)有12345描述符就緒了,在處理1的時(shí)候花費(fèi)了30s處理完,超時(shí)了,導(dǎo)致2345描述符因?yàn)殚L(zhǎng)時(shí)間沒(méi)有刷新活躍度,則存在兩種可能處理結(jié)果:
- 如果接下來(lái)的2345描述符都是通信連接描述符,恰好本次也都就緒了事件,則并不影響,因?yàn)榈?處理完了,接下來(lái)就會(huì)進(jìn)行處理并刷新活躍度。
- 如果接下來(lái)的2號(hào)描述符是定時(shí)器事件描述符,定時(shí)器觸發(fā)超時(shí),執(zhí)行定時(shí)任務(wù),就會(huì)將345描述符給釋放掉,這時(shí)候一旦345描述符對(duì)應(yīng)的連接被釋放,接下來(lái)在處理345事件的時(shí)候就會(huì)導(dǎo)致程序崩潰(內(nèi)存訪問(wèn)錯(cuò)誤)。
? 因此,在任意的事件處理中,都不應(yīng)該直接對(duì)連接進(jìn)行釋放,而應(yīng)該將釋放操作壓入到任務(wù)池中,等所有連接事件處理完了,然后執(zhí)行任務(wù)池中的任務(wù)的時(shí)候再去進(jìn)行釋放。
? 測(cè)試代碼如下:
int main() { signal(SIGCHLD, SIG_IGN); for (int i = 0; i < 10; i++) { pid_t pid = fork(); if (pid < 0) { DBG_LOG("FORK ERROR"); return -1; }else if (pid == 0) { Socket cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DBG_LOG("[%s]", buf); } cli_sock.Close(); exit(0); } } while(1) sleep(1); return 0; }
? 我們只需要將業(yè)務(wù)處理休眠上15秒,即超過(guò)超時(shí)即可。具體如下去:
6、4?一次發(fā)送多條數(shù)據(jù)測(cè)試
? 給服務(wù)器發(fā)送的一條數(shù)據(jù)中包含有多個(gè)HTTP請(qǐng)求,觀察服務(wù)器的處理。預(yù)期結(jié)果:每一條請(qǐng)求都有其對(duì)應(yīng)的響應(yīng)。
? 測(cè)試代碼如下:
int main() { Socket cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DBG_LOG("[%s]", buf); sleep(3); } cli_sock.Close(); return 0; }
6、5?大文件傳輸測(cè)試
?使用put請(qǐng)求上傳一個(gè)大文件進(jìn)行保存,大文件數(shù)據(jù)的接收會(huì)被分在多次請(qǐng)求中接收,然后計(jì)算源文件和上傳后保存的文件的MD5值,判斷請(qǐng)求的接收處埋是否存在問(wèn)題。(這里主要觀察的是上下文的處理過(guò)程是否正常。)測(cè)試代碼如下:
int main() { Socket cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); std::string req = "PUT /1234.txt HTTP/1.1\r\nConnection: keep-alive\r\n"; std::string body; Util::ReadFile("./hello.txt", &body); req += "Content-Length: " + std::to_string(body.size()) + "\r\n\r\n"; assert(cli_sock.Send(req.c_str(), req.size()) != -1); assert(cli_sock.Send(body.c_str(), body.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DBG_LOG("[%s]", buf); sleep(3); cli_sock.Close(); return 0; }
? 我們還需要將put方法的處理進(jìn)行修改,如下圖:
6、6 性能測(cè)試
? 首先說(shuō)明一下服務(wù)器測(cè)試環(huán)境:云服務(wù)器。配置為:CPU 2核 - 內(nèi)存2GB,帶寬:4Mbps。服務(wù)器程序采用1主3從reactor模式。具體如下圖:
?
? 正常情況下,客戶端應(yīng)該不再使用該同一臺(tái)服務(wù)器,因?yàn)闀?huì)搶占云服務(wù)器資源。我們先來(lái)看一下在該服務(wù)器上進(jìn)行本地還會(huì)測(cè)試。具體測(cè)試怎么進(jìn)行呢?
? ?我們采用了webbench工具。其原理是:創(chuàng)建大量的進(jìn)程,在進(jìn)程中創(chuàng)建客戶端連接服務(wù)器發(fā)送請(qǐng)求,收到響應(yīng)后關(guān)閉連接,開(kāi)始下一個(gè)連接的建立。我們先使用webbench進(jìn)行500并發(fā)量如下:
? 運(yùn)行完后的結(jié)果:
??QPS(每秒處理的包的數(shù)量)為2050。處理失敗的包并沒(méi)有。也就是500并發(fā)量沒(méi)有任何問(wèn)題。
? ?接下來(lái)我們?cè)賮?lái)看一下處理5000并發(fā)量如何呢。如下圖:
? 我們?cè)賮?lái)看一下結(jié)果:
? QPS(每秒處理的包的數(shù)量)大概為2000左右。處理失敗的包也并沒(méi)有。也就是5000并發(fā)量沒(méi)有任何問(wèn)題。
? ?我們?cè)賮?lái)看一下處理10000的并發(fā)量如何。具體如下圖:
? 運(yùn)行結(jié)果如下:
?? QPS(每秒處理的包的數(shù)量)大概為2000左右。處理失敗的包也并沒(méi)有。也就是輕輕松松可處理上萬(wàn)的并發(fā)量。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-845736.html
? 以上測(cè)試中,使用瀏覽器訪問(wèn)服務(wù)器,均能流暢獲取請(qǐng)求的頁(yè)面。但是根據(jù)測(cè)試結(jié)果能夠看出,雖然并發(fā)量一直在提高,但是總的請(qǐng)求服務(wù)器的數(shù)量并沒(méi)有增加,反而有所降低,側(cè)面反饋了處理所耗時(shí)間更多了,基本上可以根據(jù)12w/min左右的請(qǐng)求量計(jì)算出10000并發(fā)量時(shí)服務(wù)器的極限了,但是這個(gè)測(cè)試其實(shí)意義不大,因?yàn)闇y(cè)試客戶端和服務(wù)器都在同一臺(tái)機(jī)器上,專(zhuān)輸?shù)乃俣雀?,但同時(shí)搶占cpu也影響了處理,最好的方式就是在兩臺(tái)不同的機(jī)器上進(jìn)行測(cè)試,這里只是通過(guò)這個(gè)方法告訴大家該如何對(duì)服務(wù)器進(jìn)行性能測(cè)試。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-845736.html
到了這里,關(guān)于仿muduo庫(kù)實(shí)現(xiàn)one thread one loop式并發(fā)服務(wù)器的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!