目錄
1.實現(xiàn)目標
2.HTTP服務器
3.Reactor模型
3.1分類
4.功能模塊劃分:
4.1SERVER模塊:
4.2HTTP協(xié)議模塊:
5.簡單的秒級定時任務實現(xiàn)
5.1Linux提供給我們的定時器
5.2時間輪思想:
6.正則庫的簡單使用
7.通用類型any類型的實現(xiàn)
8.日志宏的實現(xiàn)
9.緩沖區(qū)buffer類的實現(xiàn)
10.套接字Socket類實現(xiàn)
11.Channel模塊實現(xiàn)
13.Poller模塊實現(xiàn)
14.定時任務管理TimerWheel模塊實現(xiàn)
15.Reactor-EventLoop線程池模塊實現(xiàn)
15.1eventfd介紹
15.2EventLoop模塊
15.3EventLoop模塊調(diào)用關系圖
15.4EventLoop模塊和線程整合起來
15.5LoopThread線程池實現(xiàn)
16.Connection模塊實現(xiàn)
17.監(jiān)聽描述符管理Acceptor模塊實現(xiàn)
18.TcpServer模塊實現(xiàn)
19.基于TcpServer實現(xiàn)回顯服務器
19.1EchoServer模塊關系圖
20.HTTP協(xié)議?持模塊實現(xiàn)
20.1Util實??具類實現(xiàn)
20.2HttpRequest請求類實現(xiàn)
20.3HttpResponse響應類實現(xiàn)
20.4HttpContext上下文類實現(xiàn)
20.5HttpServer類實現(xiàn)
20.6基于HttpServer搭建HTTP服務器
21.功能測試
21.1使用Postman進行基本功能測試
21.2長連接連續(xù)請求測試
21.3超時連接測試
21.4不完整請求測試
21.5業(yè)務處理超時測試
21.6一次發(fā)送多條數(shù)據(jù)測試
21.7大文件傳輸測試
21.8抗壓力測試
代碼倉庫:https://gitee.com/lc-yulin/server
1.實現(xiàn)目標
仿muduo庫One Thread One Loop式主從Reactor模型實現(xiàn)高并發(fā)服務器:
通過實現(xiàn)的高并發(fā)服務器組件,可以簡潔快速的完成一個高性能的服務器搭建。并且,通過組件內(nèi)提供的不同應用層協(xié)議支持,也可以快速完成一個高性能應用服務器的搭建(當前為了便于項目的演示,項目中提供HTTP協(xié)議組件的支持)
在這里,要明確的是要實現(xiàn)的是一一個高并發(fā)服務器組件,因此當前的項目中并不包含實際的業(yè)務內(nèi)容。
2.HTTP服務器
概念:
HTTP (Hyper Text Transfer Protocol),超文本傳輸協(xié)議是應用層協(xié)議,是一種簡單的請求響應協(xié)
議(客戶端根據(jù)自己的需要向服務器發(fā)送請求,服務器針對請求提供服務,完畢后通信結束)。
HTTP協(xié)議是一個運行在TCP協(xié)議之上的應用層協(xié)議,這一點本質(zhì)上是告訴我們,HTTP服務器其實就是個TCP服務器,只不過在應用層基于HTTP協(xié)議格式進行數(shù)據(jù)的組織和解析來明確客戶端的請求并完成業(yè)務處理。因此實現(xiàn)HTTP服務器簡單理解,只需要以下幾步即可
1.搭建一個TCP服務器,接收客戶端請求。
2.以HTTP協(xié)議格式進行解析請求數(shù)據(jù),明確客戶端目的。
3.明確客戶端請求目的后提供對應服務。
4.將服務結果- -HTTP協(xié)議格式進行組織,發(fā)送給客戶端
實現(xiàn)一個HTTP服務器很簡單,但是實現(xiàn)一個高性能的服務器并不簡單,這個單元中將講解基于
Reactor模式的高性能服務器實現(xiàn)。當然準確來說,因為要實現(xiàn)的服務器本身并不存在業(yè)務,咱們要實現(xiàn)的應該算是一個高性能服務器基礎庫,是一個基礎組件。
3.Reactor模型
概念
Reactor模式,是指通過一個或多個輸入同時傳遞給服務器進行請求處理時的事件驅(qū)動處理模式。
服務端程序處理傳入多路請求,并將它們同步分派給請求對應的處理線程,Reactor 模式也叫
Dispatcher模式。
簡單理解就是使用I/0多路復用統(tǒng)一監(jiān)聽事件,收到事件后分發(fā)給處理進程或線程,是編寫高性能
網(wǎng)絡服務器的必備技術之一。
3.1分類
單Reactor單線程:單I/O多路復用+業(yè)務處理
1.通過I0多路復用模型進行客戶端請求監(jiān)控
2.觸發(fā)事件后,進行事件處理
????????a.如果是新建連接請求,則獲取新建連接,并添加至多路復用模型進行事件監(jiān)控。
????????b.如果是數(shù)據(jù)通信請求,則進行對應數(shù)據(jù)處理(接收數(shù)據(jù),處理數(shù)據(jù),發(fā)送響應)
優(yōu)點:所有操作在同-個線程中完成,思想編碼較為簡單,不涉及資源問題搶奪問題
缺點:無法有效利用CPU多核資源,容易達到性能瓶頸
使用場景:翻于客戶端數(shù)量較少的場景
單Reactor多線程:
1. Reactor線程通過I/0多路復用模型進行客戶端請求監(jiān)控
2.觸發(fā)事件后,進行事件處理
????????a.如果是新建連接請求,則獲取新建連接,并添加至多路復用模型進行事件監(jiān)控。
????????b.如果是數(shù)據(jù)通信請求,則接收數(shù)據(jù)后分發(fā)給Worker線程池進行業(yè)務處理。
????????c.工作線程處理完畢后,將響應交給Reactor線程進行數(shù)據(jù)響應
優(yōu)點:充分利用CPU多核資源
缺點:多線程間的數(shù)據(jù)共享訪問控制較為復雜,單個Reactor承擔所有事件的監(jiān)聽和響應,在單線程中運行,高并發(fā)場景下容易成為性能瓶頸。
多Reactor多線程:多|/O多路復用+線程池(業(yè)務處理)
1.在主Reactor中處理新連接請求事件,有新連接到來則分發(fā)到子Reactor中監(jiān)控
2.在子Reactor中進行客戶端通信監(jiān)控,有事件觸發(fā),則接收數(shù)據(jù)分發(fā)給Worker線程池
3. Worker線程 池分配獨立的線程進行具體的業(yè)務處理
a.工作線程處理完畢后,將響應交給子Reactor線程進行數(shù)據(jù)響應
優(yōu)點:充分利用CPU多核資源,主從Reactor各司其職
目標定位: One Thread One Loop主從Reactor模型高并發(fā)服務器
要實現(xiàn)的是主從Reactor模型服務器,也就是主Reactor線程僅僅監(jiān)控監(jiān)聽描述符,獲取新建連接,保證獲取新連接的高效性,提高服務器的并發(fā)性能。
主Reactor獲取到新連接后分發(fā)給子Reactor進行通信事件監(jiān)控。而子Reactor線程 監(jiān)控各自的描述符的讀寫事件進行數(shù)據(jù)讀寫以及業(yè)務處理。
One Thread One Loop的思想就是把所有的操作都放到一個線程 中進行,一個線程對應一個事件處理的循環(huán)。
當前實現(xiàn)中,因為并不確定組件使用者的使用意向,因此并不提供業(yè)務層工作線程池的實現(xiàn),只實現(xiàn)主從Reactor,而Worker工作線程池,可由組件庫的使用者的需要自行決定是否使用和實現(xiàn)。
4.功能模塊劃分:
基于以上的理解,我們要實現(xiàn)的是一一個帶有協(xié)議支持的Reactor模型高性能服務器,因此將整個項目的實現(xiàn)劃分為兩個大的模塊:
●SERVER模塊: 實現(xiàn)Reactor模型的TCP服務器;
●協(xié)議模塊:對當前的Reactor模型服務器提供應用層協(xié)議支持。
4.1SERVER模塊:
SERVER模塊就是對所有的連接以及線程進行管理,讓它們各司其職,在合適的時候做合適的事,最終完成高性能服務器組件的實現(xiàn)。而具體的管理也分為三個方面:
●監(jiān)聽連接管理: 對監(jiān)聽連接進行管理。
●通信連接管理:對通信連接進行管理。
●超時連接 管理:對超時連接進行管理
基于以上的管理思想,將這個模塊進行細致的劃分又可以劃分為以下多個子模塊:
Buffer模塊:
功能:用于實現(xiàn)套接字的用戶緩沖區(qū)
意義:
????????a.防止接收到的數(shù)據(jù)不是完整的數(shù)據(jù),因此對接受的數(shù)據(jù)進行緩存
????????b.對于客戶端響應的數(shù)據(jù),應該是套接字可寫的情況下進行發(fā)送
功能設計:
????????a.向緩沖區(qū)添加數(shù)據(jù)
????????b.從緩沖區(qū)中取出數(shù)據(jù)
Socket模塊:
功能:對socket套接字的操作進行封裝
意義:程序中對于套接字的各項操作更加便捷
功能設計:
? ? ? ? a.創(chuàng)建套接字
? ? ? ? b.綁定地址信息
? ? ? ? c.開始監(jiān)聽
? ? ? ? d.向服務器發(fā)起連接
? ? ? ? e.獲取新連接
? ? ? ? f.接受數(shù)據(jù)
? ? ? ? g.發(fā)送數(shù)據(jù)
? ? ? ? h.關閉套接字
? ? ? ? i.創(chuàng)建一個監(jiān)聽連接
? ? ? ? j.創(chuàng)建一個客戶端連接
Channe|模塊:
功能:對于一個描述符進行監(jiān)控事件管理
意義:對于描述符的監(jiān)控事件在用戶態(tài)更容易維護,以及觸發(fā)事件后的操作流程更加清晰
功能設計:
a.對監(jiān)控事件的管理:
????????描述符是否可讀描述符是否可寫對描述符監(jiān)控可讀對描述符監(jiān)控可寫
????????解除可讀事件監(jiān)控解除可寫事件監(jiān)控解除所有事件監(jiān)控
b.對監(jiān)控事件觸發(fā)后的處理
????????設置不同的回調(diào)函數(shù)->明確出發(fā)了某個事件之后應該如何處理
Connection模塊
Connection模塊是對Buffer模塊,Socket模塊, Channel模塊的一個整體封裝,實現(xiàn)了對一個通信套接字的整體的管理,每一個進行數(shù)據(jù)通信的套接字(也就是accept獲取到的新連接)都會使用Connection進行管理。
●Connection模塊內(nèi)部包含有三個由組件使用者傳入的回調(diào)函數(shù):連接建立完成回調(diào),事件回調(diào),新數(shù)據(jù)回調(diào),關閉回調(diào)。
●Connection模塊內(nèi)部包含有兩個組件使用者提供的接口:數(shù)據(jù)發(fā)送接口,連接關閉接口,●Connection模塊內(nèi)部包含有兩個用戶態(tài)緩沖區(qū):用戶態(tài)接收緩沖區(qū),用戶態(tài)發(fā)送緩沖區(qū)
●Connection模塊內(nèi)部包含有一個Socket對象:完成描述符面向系統(tǒng)的I0操作
●Connection模塊內(nèi)部包含有-個Channel對象:完成描述符I0事件就緒的處理
具體處理流程如下:
1.實現(xiàn)向Channel提供可讀,可寫,錯誤等不同事件的I0事件回調(diào)函數(shù),然后將Channel和對應的描述符添加到Poller事件監(jiān)控中。
2.當描述符在Poller模塊中就緒了I0可讀事件,則調(diào)用描述符對應Channel中保存的讀事件處理函數(shù),進行數(shù)據(jù)讀取,將socket接收緩沖區(qū)全部讀取到Connection管理的用戶態(tài)接收緩沖區(qū)中。然后調(diào)用由組件使用者傳入的新數(shù)據(jù)到來回調(diào)函數(shù)進行處理。
3.組件使用者進行數(shù)據(jù)的業(yè)務處理完畢后,通過Connection向使用者提供的數(shù)據(jù)發(fā)送接口,將數(shù)據(jù).寫入Connection的發(fā)送緩沖區(qū)中。
4.啟動描述符在Pol模塊中的I0寫事件監(jiān)控,就緒后,調(diào)用Channel中保存的寫事件處理函數(shù),將發(fā)送緩沖區(qū)中的數(shù)據(jù)通過Socket進行面向系統(tǒng)的實際數(shù)據(jù)發(fā)送。
Acceptor模塊:
Acceptor模塊是對Socket模塊,Channel模塊的一個整體封裝,實現(xiàn)了對一個監(jiān)聽套接字的整體的管理。
●Acceptor模塊內(nèi)部包含有-個Socket對象:實現(xiàn)監(jiān)聽套接字的操作
●Acceptor模塊內(nèi)部包含有一個Channel對象:實現(xiàn)監(jiān)聽套接字I0事件就緒的處理
具體處理流程如下:
1.實現(xiàn)向Channel提供可讀事件的10事件處理回調(diào)函數(shù),函數(shù)的功能其實也就是獲取新連接
2.為新連接構建一個Connection對象出來。
TimerQueue模塊:
功能:定時任務模塊,讓一個任務可以在指定的時間之后執(zhí)行
意義:組件內(nèi)部,對于非活躍連接希望在N秒時候被釋放
功能設計:
????????a.添加定時任務
????????b.刷新定時任務->希望一個定時任務重新開始計時
????????c.取消定時任務
Poller模塊:
功能:對任意的描述符進行I0事件監(jiān)控
意義:對epolI進行封裝,讓對描述符進行事件監(jiān)控的操作更加簡單
功能接口:
? ? ? ? a.添加事件監(jiān)控->Channel模塊
? ? ? ? b.修改事件監(jiān)控
? ? ? ? c.移除事件監(jiān)控
EventLoop模塊:
EventLoop模塊可以理解就是我們上邊所說的Reactor模塊,它是對Poller模塊 ,TimerQueue模塊,Socket模塊的一個整體封裝,進行所有描述符的事件監(jiān)控。
EventLoop模塊必然是一個對象對應一個線程的模塊,線程內(nèi)部的目的就是運行EventLoop的啟動函數(shù)。
EventLoop模塊為了保證整個服務器的線程安全問題,因此要求使用者對于Connection的所有操作一定要在其對應的EventLoop線程內(nèi)完成,不能在其他線程中進行(比如組件使用者使Connection發(fā)送數(shù)據(jù),以及關閉連接這種操作)。
EventLoop模塊保證自己內(nèi)部所監(jiān)控的所有描述符,都要是活躍連接,非活躍連接就要及時釋放避免資源浪費。
EventLoop模塊內(nèi)部包含有一個eventfd: eventfd其 實就是linux內(nèi)核提供的一個事件fd,專門用于事件通知。
●EventLoop模塊內(nèi)部包含有一個Poller對象:用于進行描述符的I0事件監(jiān)控。
●EventL oop模塊內(nèi)部包含有一個TimerQueue對象:用于進行定時任務的管理。
●EventL oop模塊內(nèi)部包含有一個PendingTask隊列:組件使用者將對Connection進行的所有操作,都加入到任務隊列中,由EventLoop模塊進行管理,并在EventLoop對應的線程 中進行執(zhí)行。●每一個Connection對象都會綁定到一個EventLoop.上,這樣能保證對這個連接的所有操作都是在一個線程中完成的。
具體操作流程:
1.通過Poller模塊對當前模塊管理內(nèi)的所有描述符進行I0事件監(jiān)控,有描述符事件就緒后,通過描述符對應的Channel進行事件處理。
2.所有就緒的描述符IO事件處理完畢后,對任務隊列中的所有操作順序進行執(zhí)行。
3.由于epoll的事件監(jiān)控,有可能會因為沒有事件到來而持續(xù)阻塞,導致任務隊列中的任務不能及時得到執(zhí)行,因此創(chuàng)建了eventfd,添加到Poller的事件監(jiān)控中,用于實現(xiàn)每次向任務隊列添加任務的時候,通過向eventfd寫入數(shù)據(jù)來喚醒epoll的阻塞。
TcpServer模塊:
這個模塊是一個整體Tcp服務器模塊的封裝,內(nèi)部封裝了Acceptor模塊,EventLoopThreadPool模塊。
●TcpServer中包含有一個EventLoop對象:以備在超輕量使用場景中不需要EventLoop線程池, 只需要在主線程中完成所有操作的情況。
●TcpServer模塊內(nèi)部包含有-個EventL oopThreadPool對象: 其實就是EventLoop線程池,也就是子Reactor線程池
●TcpServer模塊內(nèi)部包含有-個Acceptor對象: 一個TcpServer服務器, 必然對應有- -個監(jiān)聽套接字,能夠完成獲取客戶端新連接,并處理的任務。
TcpServer模塊內(nèi)部包含有一個std::shared_ ptr<Connection>的hash表: 保存了所有的新建連接對應的Connection,注意,所有的Connection使用shared_ ptr進行管理,這樣能夠保證在hash表中刪除了Connection信息后,在shared_ptr計數(shù)器 為0的情況下完成對Connection資源的釋放操作。
具體操作流程如下:
1.在實例化TcpServer對象過程中,完成BaseLoop的設置, Acceptor對 象的實例化,以及EventLoop線程池的實例化,以及std::shared_ ptr<Connection>的hash表的實例化。
2.為Acceptor對象設置回調(diào)函數(shù):獲取到新連接后,為新連接構建Connection對象,設置
Connection的各項回調(diào),并使用shared_ptr進行管理,并添加到hash表中進行管理,并為
Connection選擇一個EventLoop線程 ,為Connection添加一個定時銷毀任務,為Connection添加事件監(jiān)控
3.啟動BaseLoop。
模塊關系圖:
4.2HTTP協(xié)議模塊:
HTTP協(xié)議模塊用于對高并發(fā)服務器模塊進行協(xié)議支持,基于提供的協(xié)議支持能夠更方便的完成指定協(xié)議服務器的搭建。
而HTTP協(xié)議支持模塊的實現(xiàn),可以細分為以下幾個模塊。
Util模塊:
這個模塊是一個工具模塊,主要提供HTTP協(xié)議模塊所用到的一些工具函數(shù),比如url編解碼,文件讀,寫...等。
HttpRequest模塊:
這個模塊是HTTP請求數(shù)據(jù)模塊,用于保存HTTP請求數(shù)據(jù)被解析后的各項請求元素信息。
HttpResponse模塊:
這個模塊是HTTP響應數(shù)據(jù)模塊,用于業(yè)務處理后設置并保存HTTP響應數(shù)據(jù)的的各項元素信息,最終會被按照HTTP協(xié)議響應格式組織成為響應信息發(fā)送給客戶端。?
HttpContext模塊:
這個模塊是一個HTTP請求接收的上下文模塊,主要是為了防止在一次接收的數(shù)據(jù)中,不是一個完整的HTTP請求,則解析過程并未完成,無法進行完整的請求處理,需要在下次接收到新數(shù)據(jù)后繼續(xù)根據(jù)上下文進行解析,最終得到一個HttpRequest請求信息對象,因此在請求數(shù)據(jù)的接收以及解析部分需要一個上下文來進行控制接收和處理節(jié)奏。
HttpServer模塊:
這個模塊是最終給組件使用者提供的HTTP服務器模塊了,用于以簡單的接口實現(xiàn)HTTP服務器的搭建。
HttpServer模塊內(nèi)部包含有一個TcpServer對象: TcpServer對象實現(xiàn)服務器的搭建
HttpServer模塊內(nèi)部包含有兩個提供給TcpServer對象的接口:連接建立成功設置上下文接口,數(shù)據(jù)處理接口。
HttpServer模塊內(nèi)部包含有一個hash-map表存儲請求與處理函數(shù)的映射表:組件使用者向
HttpServer設置哪些請求應該使用哪些函數(shù)進行處理,等TcpServer收到對應的請求就會使用對應的函數(shù)進行處理。
5.簡單的秒級定時任務實現(xiàn)
在當前的?并發(fā)服務器中,我們不得不考慮?個問題,那就是連接的超時關閉問題。我們需要避免?個連接?時間不通信,但是也不關閉,空耗資源的情況。
這時候我們就需要?個定時任務,定時的將超時過期的連接進?釋放。
5.1Linux提供給我們的定時器
#include <sys/timerfd.h>
int timerfd_create(int clockid, int flags);
clockid: CLOCK_REALTIME-系統(tǒng)實時時間,如果修改了系統(tǒng)時間就會出問題;
CLOCK_MONOTONIC-從開機到現(xiàn)在的時間是?種相對時間;
flags: 0-默認阻塞屬性
int timerfd_settime(int fd, int flags, struct itimerspec *new, struct
itimerspec *old);
fd: timerfd_create返回的?件描述符
flags: 0-相對時間, 1-絕對時間;默認設置為0即可.
new: ?于設置定時器的新超時時間
old: ?于接收原來的超時時間
struct timespec {
time_t tv_sec; /* Seconds */
long tv_nsec; /* Nanoseconds */
};
struct itimerspec {
struct timespec it_interval; /* 第?次之后的超時間隔時間 */
struct timespec it_value; /* 第?次超時時間 */
};
定時器會在每次超時時,自動給fd中寫?8字節(jié)的數(shù)據(jù),表?在上?次讀取數(shù)據(jù)到當前讀取數(shù)據(jù)期間超時了多少次。
實例:
#include<stdio.h>
#include<unistd.h>
#include<stdint.h>
#include<fcntl.h>
#include<sys/timerfd.h>
int main()
{
//1.創(chuàng)建一個定時器:
int timerfd = timerfd_create(CLOCK_MONOTONIC,0);
if(timerfd < 0) {
perror("create fail!");
return -1;
}
//2.啟動定時器:
struct itimerspec itime;
itime.it_value.tv_sec = 1;
itime.it_value.tv_nsec = 0;//第一次超時時間為1s后
itime.it_interval.tv_sec = 1;
itime.it_interval.tv_nsec = 0; //第一次超時后,每次超時的間隔時
// 這個定時器描述符將每隔1秒都會觸發(fā)?次可讀事件
timerfd_settime(timerfd,0,&itime,NULL);
while(1) {
uint64_t times;
int ret = read(timerfd,×,sizeof(times));
if(ret < 0) {
perror("read error!");
return -1;
}
printf("超時了,距離上一次超時了%ld次\n", times);
}
close(timerfd);
return 0;
}
[myl@VM-8-12-centos example]$ ./timerfd
超時了,距離上一次超時了1次
超時了,距離上一次超時了1次
超時了,距離上一次超時了1次
上邊例子,是?個定時器的使用實例,是每隔1s鐘觸發(fā)?次定時器超時,否則就會阻塞在read讀取數(shù)據(jù)這里?;谶@個例?,則我們可以實現(xiàn)每隔1s,檢測?下哪些連接超時了,然后將超時的連接釋放掉。
5.2時間輪思想:
上述的例子,存在?個很?的問題,每次超時都要將所有的連接遍歷?遍,如果有上萬個連接,效率?疑是較為低下的。這時候?家就會想到,我們可以針對所有的連接,根據(jù)每個連接最近?次通信的系統(tǒng)時間建立?個小根堆,這樣只需要每次針對堆頂部分的連接逐個釋放,直到?jīng)]有超時的連接為止,這樣也可以大大提高處理的效率。
上述方法可以實現(xiàn)定時任務,但是這?給?家介紹另?種方案:時間輪
時間輪的思想來源于鐘表,如果我們定了?個3點鐘的鬧鈴,則當時針走到3的時候,就代表時間到
了。
同樣的道理,如果我們定義了?個數(shù)組,并且有?個指針,指向數(shù)組起始位置,這個指針每秒鐘向后走動?步,?到哪里,則代表哪里的任務該被執(zhí)?了,那么如果我們想要定?個3s后的任務,則只需要將任務添加到tick+3位置,則每秒中走一步,三秒鐘后tick?到對應位置,這時候執(zhí)行對應位置的任務即可。
但是,同?時間可能會有?批量的定時任務,因此我們可以給數(shù)組對應位置下拉?個數(shù)組,這樣就可以在同?個時刻上添加多個定時任務了。
存在的問題:
當某個任務在超時時間內(nèi)收到了一次數(shù)據(jù)請求,此時就需要延長定期時間
解決方式:使用智能指針 + 析構函數(shù)
將定時任務封裝成一個類,類實例化的每一個對象,就是一個定時任務對象,當對象銷毀的時候,再去執(zhí)行定時任務(將定時任務的執(zhí)行放到析構函數(shù)中)
shared_ptr用于對new的對象進行空間管理,當shared_ptr對對象進行管理的時候,內(nèi)部有一個計數(shù)器,當計數(shù)器為0的時候,則釋放所管理的對象
收到新的數(shù)據(jù)請求之后又構建一個shared_ptr對象加入到數(shù)組中,此時計數(shù)器變?yōu)?,進而達到了延長定時任務的效果。
實例:
#include<iostream>
#include<vector>
#include<functional>
#include<unordered_map>
#include<memory>
#include<unistd.h>
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:
uint64_t _id; //定時器任務對象ID
uint32_t _timeout; //定時任務的超時時間
bool _canceled; //false表示沒有取消 true表示被取消
TaskFunc _task_cb; //定時器對象要執(zhí)行的定時任務
ReleaseFunc _release; //用于刪除TimerWheel中保存的定時器對象信息
public:
TimerTask(uint64_t id,uint32_t delay,const TaskFunc& cb)
:_id(id),_timeout(delay),_task_cb(cb),_canceled(false) {}
void SetRelease(const ReleaseFunc& cb) {_release = cb;}
uint32_t DelayTime() {return _timeout;}
void Cancel() {_canceled = true;}
~TimerTask() {
if(_canceled == false)
_task_cb();
_release();
}
};
class TimerWheel
{
private:
//解決出現(xiàn)不同的智能指針管理同一個對象,當一個智能指針釋放該對象之后,另一個智能指針管理一個空的對象
//所以使用weak_ptr和id關聯(lián)起來,每次使用智能指針管理對象的時候都能找到該對象被哪個智能指針對象管理
//只需要引用計數(shù)增加,而不是再使用一個智能指針對象進行管理,weak_ptr的特點是管理對象資源,但是不增加
//引用計數(shù)
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
int _tick; //當前的秒針,走到哪里釋放哪里,釋放哪里就相當于執(zhí)行哪里的任務
int _capacity; //表盤的最大數(shù)量,也就是最大延遲時間
std::vector<std::vector<PtrTask>> _wheel;
std::unordered_map<uint64_t,WeakTask> _timers;
private:
void RemoveTimer(uint64_t id) {
auto it = _timers.find(id);
if(it != _timers.end())
_timers.erase(it);
}
public:
TimerWheel():_capacity(60),_tick(0),_wheel(_capacity){}
//1.添加定時任務:
void TimerAdd(uint64_t id,uint32_t delay,const TaskFunc& cb) {
PtrTask pt(new TimerTask(id,delay,cb));
pt->SetRelease(std::bind(&TimerWheel::RemoveTimer,this,id));
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
_timers[id] = WeakTask(pt);
}
//2.刷新和延遲定時任務:
void TimerRefresh(uint64_t id) {
//通過保存的定時器對象的weak_ptr構造一個shared_ptr出來,添加到輪子中
auto it = _timers.find(id);
if(it == _timers.end()) return;
PtrTask pt = it->second.lock(); //lock()獲取weak_ptr中管理的對象對應的shared_ptr;
int delay = pt->DelayTime();
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
}
//3.這個函數(shù)應該每秒鐘被執(zhí)行一次,相當于秒鐘向后走了一步
void RunTimerTask() {
_tick = (_tick + 1) % _capacity;
//清空指定位置的數(shù)組,就會把數(shù)組中保存的所有管理定時器對象的shared_ptr釋放掉
_wheel[_tick].clear();
}
//4.取消定時任務:
void TimerCancel(uint64_t id) {
auto it = _timers.find(id);
if(it == _timers.end()) return;
PtrTask pt = it->second.lock();
if(pt) pt->Cancel();
}
};
//測試:
class Test
{
public:
Test() {std::cout << "構造" << std::endl;}
~Test() {std::cout << "析構" << std::endl;}
};
void DelTest(Test* t) {
delete t;
}
int main()
{
TimerWheel tw;
Test* t = new Test();
tw.TimerAdd(111,5,std::bind(DelTest,t));
for(int i = 0; i < 5; ++i) {
sleep(1);
//刷新定時任務
tw.TimerRefresh(111);
//向后移動秒針:
tw.RunTimerTask();
printf("刷新了一下定時任務,重新需要%d鐘后才會銷毀\n",5-i);
}
// tw.TimerCancel(111);
while(1)
{
std::cout << "---------" << std::endl;
sleep(1);
tw.RunTimerTask();
}
return 0;
}
[myl@VM-8-12-centos example]$ ./timewheel
構造
刷新了一下定時任務,重新需要5鐘后才會銷毀
刷新了一下定時任務,重新需要4鐘后才會銷毀
刷新了一下定時任務,重新需要3鐘后才會銷毀
刷新了一下定時任務,重新需要2鐘后才會銷毀
刷新了一下定時任務,重新需要1鐘后才會銷毀
---------
---------
---------
---------
析構
6.正則庫的簡單使用
正則表達式(regular expression)描述了?種字符串匹配的模式(pattern),可以用來檢查?個串是否含有某種?串、將匹配的子串替換或者從某個串中取出符合某個條件的子串等。
正則表達式的使用,可以使得HTTP請求的解析更加簡單(這?指的時程序員的?作變得的簡單,這并不代表處理效率會變高,實際上效率上是低于直接的字符串處理的),使我們實現(xiàn)的HTTP組件庫使用起來更加靈活。
實例:
#include<iostream>
#include<string>
#include<regex>
int main()
{
std::string str = "/numbers/1234";
//匹配以"/numbers/"為起始,后邊跟一個或者多個數(shù)字字符的字符串,并且在匹配的過程中
//提取匹配到的這個數(shù)字字符串
std::regex e("/numbers/(\\d+)");
std::smatch matches;
bool ret = std::regex_match(str,matches,e);
if(ret == false) return -1;
for(auto& e: matches)
std::cout << e << std::endl;
return 0;
}
[myl@VM-8-12-centos example]$ ./regex
/numbers/1234
1234
#include<iostream>
#include<string>
#include<regex>
int main()
{
//HTTP請求行格式: GET /baidu/login?user=xiaoming&pass=123123 HTTP/1.1\r\n
std::string str = "GET /baidu/login?user=xiaoming&pass=123123 HTTP/1.1\r\n";
std::smatch matches;
//請求方法的匹配 GET HEAD POST PUT DELETE ....
std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?"
,std::regex::icase);
//(GET|HEAD|POST|PUT|DELETE):表示匹配任意一個字符串
//([^?]*):[^?]匹配非問號字符,后邊的*表示0次或者是多次
//\\?(.*) \\? 表示原始的?字符 (.*)表示提取?之后的任意字符0次或多次,知道遇到空格
//HTTP/1\\.[01] 表示匹配以HTTP/1.開始,后邊有個0或1的字符串
//(?:\n|\r\n)? (?: ...) 表示匹配某個格式字符串,但是不提取, 最后的?表示的是匹配前邊的表達式0次或1次
bool ret = std::regex_match(str,matches,e);
if(ret == false) return -1;
for(auto& e: matches)
std::cout << e << std::endl;
return 0;
}
[myl@VM-8-12-centos example]$ ./request
GET /baidu/login?user=xiaoming&pass=123123 HTTP/1.1
GET
/baidu/login
user=xiaoming&pass=123123
HTTP/1.1
7.通用類型any類型的實現(xiàn)
每?個Connection對連接進?管理,最終都不可避免需要涉及到應?層協(xié)議的處理,因此在
Connection中需要設置協(xié)議處理的上下?來控制處理節(jié)奏。但是應?層協(xié)議千千萬,為了降低耦合
度,這個協(xié)議接收解析上下?就不能有明顯的協(xié)議傾向,它可以是任意協(xié)議的上下?信息,因此就需要?個通?的類型來保存各種不同的數(shù)據(jù)結構。
在C語言中,通?類型可以使?void*來管理,但是在C++中,boost庫和C++17給我們提供了?個通?類型any來靈活使?,如果考慮增加代碼的移植性,盡量減少第三?庫的依賴,則可以使?C++17特性中的any,或者自己來實現(xiàn)。而這個any通?類型類的實現(xiàn)其實并不復雜,以下是簡單的部分實現(xiàn)。
1.是一個容器,容器中可以保存各種不同類型的數(shù)據(jù)
解決方式:
a.模板:
template<class T>
class Any
{
private:
T _content;
};
存在的問題:
實例化對象的時候,必須指定容器保存的數(shù)據(jù)類型:Any<int> a;
而我們需要的是: Any a; a = 10
b.嵌套一下,設計一個類,專門用于保存其它類型的數(shù)據(jù),而Any類保存的是固定類的對象
class Any
{
private:
class hlder
{……};
template<class T>
class placeholder : holder
{
T _val;
};
holder* _content;
}
Any類中,保存的是holder類的指針,當Any容器需要保存一個數(shù)據(jù)的時候,只需要通過palceholder子類實例化一個特定類型的子類對象出來,讓子類對象保存數(shù)據(jù),采用多態(tài)的思想,父類指針指向子類的時候調(diào)用子類的方法
Any類模擬實現(xiàn):
#include <iostream>
#include <typeinfo>
#include <cassert>
#include <unistd.h>
#include <any>
class Any
{
private:
class holder
{
public:
virtual ~holder() {}
virtual const std::type_info& type() = 0;
virtual holder* clone() = 0;
};
template<class T>
class placeholder : public holder
{
public:
placeholder(const T& val) : _val(val) {}
//獲取子類對象保存的數(shù)據(jù)類型:
virtual const std::type_info& type() {
return typeid(T);
}
//針對當前的對象自身,克隆出一個新的子類對象來
virtual holder* clone() {
return new placeholder(_val);
}
public:
T _val;
};
holder* _content;
public:
Any(): _content(nullptr) {}
template<class T>
Any(const T& val) : _content(new placeholder<T>(val)) {}
Any(const Any& other) : _content(other._content ? other._content->clone() : NULL) {}
~Any() {delete _content;}
Any& swap(Any& other) {
std::swap(_content,other._content);
return *this;
}
//返回子類對象保存的數(shù)據(jù)指針
template<class T>
T* get() {
//獲取想要的數(shù)據(jù)類型,必須和保存的數(shù)據(jù)類型一致
assert(typeid(T) == _content->type());
return &((placeholder<T>*)_content)->_val;
}
//賦值運算符的重載函數(shù):
template<class T>
Any& operator=(const T& val) {
//為val構造一個臨時的通用容器,然后與當前容器自身進行指針交換,臨時對象釋放的時候,
//原先保存的數(shù)據(jù)也就被釋放
Any(val).swap(*this);
return *this;
}
Any& operator=(const Any& other) {
Any(other).swap(*this);
return *this;
}
};
//測試:
int main()
{
Any a;
a = 10;
int* pa = a.get<int>();
std::cout << *pa << std::endl;
return 0;
}
[myl@VM-8-12-centos example]$ ./any
10
8.日志宏的實現(xiàn)
#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL INF
#define LOG(level, format, ...) do{\
if (level < LOG_LEVEL) break;\
time_t t = time(NULL);\
struct tm *ltm = localtime(&t);\
char tmp[32] = {0};\
strftime(tmp, 31, "%H:%M:%S", ltm);\
fprintf(stdout, "[%p %s %s:%d] " format "\n",(void*)pthread_self(),tmp, __FILE__, __LINE__, ##__VA_ARGS__);\
}while(0)
#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)
9.緩沖區(qū)buffer類的實現(xiàn)
提供的功能:存儲數(shù)據(jù),取出數(shù)據(jù)
實現(xiàn)思想:
1.實現(xiàn)緩沖區(qū)得要有-塊空間,采用vector < char> ,vector底層采用的是一塊線性的內(nèi)存空間
2.要素:
a.默認的空間大小
b.當前讀取數(shù)據(jù)位置
c.當前的寫入數(shù)據(jù)位置
3.操作:
a.寫入數(shù)據(jù):
當前寫入位置指向哪里,就從哪里開始寫入,如果后續(xù)空閑空間不夠了,考慮整齊緩沖區(qū)空閑空間是否足夠
足夠:將數(shù)據(jù)移動到起始位置即呵
不夠:擴容,從當前寫位開始擴容足夠大小.
數(shù)據(jù)一旦寫入成功,當前寫位置就要向后偏移
b.讀取數(shù)據(jù):
當前的讀取位置指向哪里,就從哪里開始讀取,前提是有數(shù)據(jù)可讀
可讀數(shù)據(jù)大小:當前寫入位置減去當前讀取位置
實現(xiàn)緩沖區(qū)類該如何設計:?
class Buffer
{
private:
std:vector<char>_ buffer;
//位置,是一個相對偏移量,而不是絕對地址
uint64_ t_ _read_ idx; //讀位置
uint64_ t_ write_ idx; /寫位置
public:
1.獲取當前寫位置的地址
2.確??蓪懣臻g足夠(移動+擴容)
3.獲取前沿空閑空間大小
4.獲取后沿空間空間大小
5.將讀寫位置向后移動指定長度
6.獲取當前讀位置地址
7.獲取可讀數(shù)據(jù)大小
8.將讀位置向后移動指定長度
9.清理功能
};
實例代碼:
class Buffer
{
private:
std::vector<char> _buffer; //使用vector進行內(nèi)存空間管理
uint64_t _read_idx; //讀偏移
uint64_t _write_idx; //寫偏移
public:
Buffer() : _read_idx(0),_write_idx(0),_buffer(BUFFER_DEFAULT_SIZE) {}
char* Begin() {return &*_buffer.begin();}
//獲取當前寫入起始地址, _buffer的空間起始地址,加上寫偏移量
char *WritePosition() { return Begin() + _write_idx; }
//獲取當前讀取起始地址
char *ReadPosition() { return Begin() + _read_idx; }
//獲取緩沖區(qū)末尾空閑空間大小--寫偏移之后的空閑空間, 總體空間大小減去寫偏移
uint64_t TailIdleSize() { return _buffer.size() - _write_idx; }
//獲取緩沖區(qū)起始空閑空間大小--讀偏移之前的空閑空間
uint64_t HeadIdleSize() { return _read_idx; }
//獲取可讀數(shù)據(jù)大小 = 寫偏移 - 讀偏移
uint64_t ReadAbleSize() { return _write_idx - _read_idx; }
//將讀偏移向后移動
void MoveReadOffset(uint64_t len) {
if (len == 0) return;
//向后移動的大小,必須小于可讀數(shù)據(jù)大小
assert(len <= ReadAbleSize());
_read_idx += len;
}
//將寫偏移向后移動
void MoveWriteOffset(uint64_t len) {
//向后移動的大小,必須小于當前后邊的空閑空間大小
assert(len <= TailIdleSize());
_write_idx += len;
}
//確??蓪懣臻g足夠(整體空閑空間夠了就移動數(shù)據(jù),否則就擴容)
void EnsureWriteSpace(uint64_t len) {
//如果末尾空閑空間大小足夠,直接返回
if (TailIdleSize() >= len) { return; }
//末尾空閑空間不夠,則判斷加上起始位置的空閑空間大小是否足夠, 夠了就將數(shù)據(jù)移動到起始位置
if (len <= TailIdleSize() + HeadIdleSize()) {
//將數(shù)據(jù)移動到起始位置
uint64_t rsz = ReadAbleSize();//把當前數(shù)據(jù)大小先保存起來
std::copy(ReadPosition(), ReadPosition() + rsz, Begin());//把可讀數(shù)據(jù)拷貝到起始位置
_read_idx = 0; //將讀偏移歸0
_write_idx = rsz; //將寫位置置為可讀數(shù)據(jù)大小, 因為當前的可讀數(shù)據(jù)大小就是寫偏移量
}else {
//總體空間不夠,則需要擴容,不移動數(shù)據(jù),直接給寫偏移之后擴容足夠空間即可
_buffer.resize(_write_idx + len);
}
}
//寫入數(shù)據(jù)
void Write(const void *data, uint64_t len) {
//1. 保證有足夠空間,2. 拷貝數(shù)據(jù)進去
if (len == 0) return;
EnsureWriteSpace(len);
const char *d = (const char *)data;
std::copy(d, d + len, WritePosition());
}
void WriteAndPush(const void *data, uint64_t len) {
Write(data, len);
MoveWriteOffset(len);
}
void WriteString(const std::string &data) {
return Write(data.c_str(), data.size());
}
void WriteStringAndPush(const std::string &data) {
WriteString(data);
MoveWriteOffset(data.size());
}
void WriteBuffer(Buffer &data) {
return Write(data.ReadPosition(), data.ReadAbleSize());
}
void WriteBufferAndPush(Buffer &data) {
WriteBuffer(data);
MoveWriteOffset(data.ReadAbleSize());
}
//讀取數(shù)據(jù)
void Read(void *buf, uint64_t len) {
//要求要獲取的數(shù)據(jù)大小必須小于可讀數(shù)據(jù)大小
assert(len <= ReadAbleSize());
std::copy(ReadPosition(), ReadPosition() + len, (char*)buf);
}
void ReadAndPop(void *buf, uint64_t len) {
Read(buf, len);
MoveReadOffset(len);
}
std::string ReadAsString(uint64_t len) {
//要求要獲取的數(shù)據(jù)大小必須小于可讀數(shù)據(jù)大小
assert(len <= ReadAbleSize());
std::string str;
str.resize(len);
Read(&str[0], len);
return str;
}
std::string ReadAsStringAndPop(uint64_t len) {
assert(len <= ReadAbleSize());
std::string str = ReadAsString(len);
MoveReadOffset(len);
return str;
}
char *FindCRLF() {
//找到'\n'的位置
char *res = (char*)memchr(ReadPosition(), '\n', ReadAbleSize());
return res;
}
/*通常獲取一行數(shù)據(jù),這種情況針對是*/
std::string GetLine() {
char *pos = FindCRLF();
if (pos == NULL) {
return "";
}
// +1是為了把換行字符也取出來。
return ReadAsString(pos - ReadPosition() + 1);
}
std::string GetLineAndPop() {
std::string str = GetLine();
MoveReadOffset(str.size());
return str;
}
//清空緩沖區(qū)
void Clear() {
//只需要將偏移量歸0即可
_read_idx = 0;
_write_idx = 0;
}
};
10.套接字Socket類實現(xiàn)
創(chuàng)建套接字
綁定地址信息
開始監(jiān)聽
向服務器發(fā)起連接
獲取新連接
接受數(shù)據(jù)
發(fā)送數(shù)據(jù)
關閉套接字
創(chuàng)建一個服務 端連接
創(chuàng)建一個客戶端連接
設置套接字選項—開啟地址端口用 ->?主動斷開連接的一方會進入time wait狀態(tài),此時就會出現(xiàn)綁定端口號失敗的情況,所以開啟地址端口重用,重新綁定端口號
設置套接字阻塞屬性--設置為非阻塞
class NetWork {
public:
NetWork() {
DBG_LOG("SIGPIPE INIT");
signal(SIGPIPE, SIG_IGN);
}
};
/*避免服務器因為給斷開連接的客戶端進?send觸發(fā)異常導致程序崩潰,因此忽略SIGPIPE信號*/
/*定義靜態(tài)全局是為了保證構造函數(shù)中的信號忽略處理能夠在程序啟動階段就被直接執(zhí)?*/
static NetWork nw;
#define MAX_LISTEN 1024
class Socket
{
private:
int _sockfd;
public:
Socket() :_sockfd(-1) {}
Socket(int fd) : _sockfd(fd) {}
~Socket() {Close();}
//創(chuàng)建套接字 -> int socket(int domain, int type, int protocol)
int Fd() {return _sockfd;}
bool Create() {
_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (_sockfd < 0) {
ERR_LOG("CREATE SOCKET FAILED!!");
return false;
}
return true;
}
//綁定地址信息
bool Bind(const std::string &ip, uint16_t port) {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
// int bind(int sockfd, struct sockaddr*addr, socklen_t len);
int ret = bind(_sockfd, (struct sockaddr*)&addr, len);
if (ret < 0) {
ERR_LOG("BIND ADDRESS FAILED!");
return false;
}
return true;
}
//開始監(jiān)聽 backlog:全連接隊列的大小
bool Listen(int backlog = MAX_LISTEN) {
// int listen(int backlog)
int ret = listen(_sockfd, backlog);
if (ret < 0) {
ERR_LOG("SOCKET LISTEN FAILED!");
return false;
}
return true;
}
//向服務器發(fā)起連接
bool Connect(const std::string &ip, uint16_t port) {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
// int connect(int sockfd, struct sockaddr*addr, socklen_t len);
int ret = connect(_sockfd, (struct sockaddr*)&addr, len);
if (ret < 0) {
ERR_LOG("CONNECT SERVER FAILED!");
return false;
}
return true;
}
//獲取新連接
int Accept() {
// int accept(int sockfd, struct sockaddr *addr, socklen_t *len);
int newfd = accept(_sockfd, NULL, NULL);
if (newfd < 0) {
ERR_LOG("SOCKET ACCEPT FAILED!");
return -1;
}
return newfd;
}
//接收數(shù)據(jù)
ssize_t Recv(void *buf, size_t len, int flag = 0) {
// ssize_t recv(int sockfd, void *buf, size_t len, int flag);
ssize_t ret = recv(_sockfd, buf, len, flag);
if (ret <= 0) {
//EAGAIN 當前socket的接收緩沖區(qū)中沒有數(shù)據(jù)了,在非阻塞的情況下才會有這個錯誤
//EINTR 表示當前socket的阻塞等待,被信號打斷了,
if (errno == EAGAIN || errno == EINTR) {
return 0;//表示這次接收沒有接收到數(shù)據(jù)
}
ERR_LOG("SOCKET RECV FAILED!!");
return -1;
}
return ret; //實際接收的數(shù)據(jù)長度
}
ssize_t NonBlockRecv(void *buf, size_t len) {
return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示當前接收為非阻塞。
}
//發(fā)送數(shù)據(jù)
ssize_t Send(const void *buf, size_t len, int flag = 0) {
// ssize_t send(int sockfd, void *data, size_t len, int flag);
ssize_t ret = send(_sockfd, buf, len, flag);
if (ret < 0) {
if (errno == EAGAIN || errno == EINTR) {
return 0;
}
ERR_LOG("SOCKET SEND FAILED!!");
return -1;
}
return ret;//實際發(fā)送的數(shù)據(jù)長度
}
ssize_t NonBlockSend(void *buf, size_t len) {
if (len == 0) return 0;
return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示當前發(fā)送為非阻塞。
}
//關閉套接字
void Close() {
if (_sockfd != -1) {
close(_sockfd);
_sockfd = -1;
}
}
//創(chuàng)建一個服務端連接
bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) {
//1. 創(chuàng)建套接字,2. 綁定地址,3. 開始監(jiān)聽,4. 設置默認是阻塞, 5. 啟動地址重用
if (Create() == false) return false;
if (block_flag) NonBlock();
if (Bind(ip, port) == false) return false;
if (Listen() == false) return false;
ReuseAddress();
return true;
}
//創(chuàng)建一個客戶端連接
bool CreateClient(uint16_t port, const std::string &ip) {
//1. 創(chuàng)建套接字,2.指向連接服務器
if (Create() == false) return false;
if (Connect(ip, port) == false) return false;
return true;
}
//設置套接字選項---開啟地址端口重用
void ReuseAddress() {
// int setsockopt(int fd, int leve, int optname, void *val, int vallen)
int val = 1; //SO_REUSEADDR:設置地址重用
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&val, sizeof(int));
val = 1; //SO_REUSEPORT :設置端口號重用
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&val, sizeof(int));
}
//設置套接字阻塞屬性-- 設置為非阻塞
void NonBlock() {
//int fcntl(int fd, int cmd, ... /* arg */ );
int flag = fcntl(_sockfd, F_GETFL, 0);
fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
}
};
11.Channel模塊實現(xiàn)
目的:對描述符的監(jiān)控時間管理
功能:
1 .事件管理:
描述符是否可讀
描述符是否可寫
對描述符監(jiān)控可讀
對描述符監(jiān)控可寫
解除可讀事件監(jiān)控
解除可寫事件監(jiān)控
解除所有事件監(jiān)控
2.事件出發(fā)后的處理的管理
要處理的事件:可讀,可寫,掛斷,錯誤,任意
事件處理的回調(diào)函數(shù)成員:
EPOLLIN:可讀.
EPOLLOUT:可寫
EPOLLRDHUP:連接斷開
EPOLLPRI:優(yōu)先數(shù)據(jù)
EPOLLERR:出錯了
EPOLLHUP:掛斷
事件處理,因為有五種事件要處理,就需要五個回調(diào)函數(shù)
class Poller;
class EventLoop;
class Channel
{
private:
int _fd;
EventLoop *_loop;
uint32_t _events; // 當前需要監(jiān)控的事件
uint32_t _revents; // 當前連接觸發(fā)的事件
using EventCallback = std::function<void()>;
EventCallback _read_callback; //可讀事件被觸發(fā)的回調(diào)函數(shù)
EventCallback _write_callback; //可寫事件被觸發(fā)的回調(diào)函數(shù)
EventCallback _error_callback; //錯誤事件被觸發(fā)的回調(diào)函數(shù)
EventCallback _close_callback; //連接斷開事件被觸發(fā)的回調(diào)函數(shù)
EventCallback _event_callback; //任意事件被觸發(fā)的回調(diào)函數(shù)
public:
Channel(EventLoop *loop,int fd):_fd(fd), _events(0), _revents(0),_loop(loop) {}
int Fd() { return _fd; }
uint32_t Events() { return _events; }//獲取想要監(jiān)控的事件
void SetREvents(uint32_t events) { _revents = events; }//設置實際就緒的事件
void SetReadCallback(const EventCallback &cb) { _read_callback = cb; }
void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }
void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }
void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }
void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }
//當前是否監(jiān)控了可讀
bool ReadAble() { return (_events & EPOLLIN); }
//當前是否監(jiān)控了可寫
bool WriteAble() { return (_events & EPOLLOUT); }
//啟動讀事件監(jiān)控
void EnableRead() { _events |= EPOLLIN; Update(); }
//啟動寫事件監(jiān)控
void EnableWrite() { _events |= EPOLLOUT; Update(); }
//關閉讀事件監(jiān)控
void DisableRead() { _events &= ~EPOLLIN; Update(); }
//關閉寫事件監(jiān)控
void DisableWrite() { _events &= ~EPOLLOUT; Update(); }
//關閉所有事件監(jiān)控
void DisableAll() { _events = 0; Update(); }
//移除監(jiān)控
void Remove();
void Update();
//事件處理,一旦連接觸發(fā)了事件,就調(diào)用這個函數(shù),自己觸發(fā)了什么事件如何處理自己決定
void HandleEvent() { //EPOLLRDHUP:半連接 EPOLLPRI:帶外數(shù)據(jù)
if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) {
/*不管任何事件,都調(diào)用的回調(diào)函數(shù)*/
if (_read_callback) _read_callback();
}
/*有可能會釋放連接的操作事件,一次只處理一個*/
if (_revents & EPOLLOUT) {
if (_write_callback) _write_callback();
}else if (_revents & EPOLLERR) {
if (_error_callback) _error_callback();//一旦出錯,就會釋放連接,因此要放到前邊調(diào)用任意回調(diào)
}else if (_revents & EPOLLHUP) {
if (_close_callback) _close_callback();
}
if (_event_callback) _event_callback();
}
};
13.Poller模塊實現(xiàn)
描述符IO事件監(jiān)控模塊
意義:通過epoll實現(xiàn)對描述符的I0事件監(jiān)控
功能:
1.添加/修改描述符的事件監(jiān)控(不存在則添加,存在則修改)
2.移除描述符的事件監(jiān)控
封裝思想:
1.必須擁有一個epoll的操作句柄
2.擁有一個struct epoll_ event結構體數(shù)組,監(jiān)控是保存所有的活躍事件
3.使用hash表管理描述符于描述符對應的事件管理Channel對象
邏輯流程:
1.對描述符進行監(jiān)控,通過Channel才 能知道描述符需要監(jiān)控什么事件
2.當描述符就緒了,通過描述符在hash表中找到對應的Channel(得到了Channel才能知道什么事件該如何處理)當描述符就緒了,返回就緒描述符對應的Channel
類的設計:?
class Poller
{
private:
int_ epfd;
struct epoll_ event_ evs[X]
std::unordered_ map< int,Channel*>
private:
1判斷要更新事件的描述符是否存在
2.針對epoll直接操作(添加,修改,刪除)
public:
1.添加或更行描述符所監(jiān)控的事件
2.移除描述符的監(jiān)控
3.開始監(jiān)控,獲取就緒的Channel
};
代碼實現(xiàn):
#define MAX_EPOLLEVENTS 1024
class Poller
{
private:
int _epfd;
struct epoll_event _evs[MAX_EPOLLEVENTS];
std::unordered_map<int, Channel *> _channels;
private:
//對epoll的直接操作
void Update(Channel *channel, int op) {
// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);
int fd = channel->Fd();
struct epoll_event ev;
ev.data.fd = fd;
ev.events = channel->Events();
int ret = epoll_ctl(_epfd, op, fd, &ev);
if (ret < 0) {
ERR_LOG("EPOLLCTL FAILED!");
}
return;
}
//判斷一個Channel是否已經(jīng)添加了事件監(jiān)控
bool HasChannel(Channel *channel) {
auto it = _channels.find(channel->Fd());
if (it == _channels.end()) {
return false;
}
return true;
}
public:
Poller() {
_epfd = epoll_create(MAX_EPOLLEVENTS);
if (_epfd < 0) {
ERR_LOG("EPOLL CREATE FAILED!!");
abort();//退出程序
}
}
//添加或修改監(jiān)控事件
void UpdateEvent(Channel *channel) {
bool ret = HasChannel(channel);
if (ret == false) {
//不存在則添加
_channels.insert(std::make_pair(channel->Fd(), channel));
return Update(channel, EPOLL_CTL_ADD);
}
return Update(channel, EPOLL_CTL_MOD);
}
//移除監(jiān)控
void RemoveEvent(Channel *channel) {
auto it = _channels.find(channel->Fd());
if (it != _channels.end()) {
_channels.erase(it);
}
Update(channel, EPOLL_CTL_DEL);
}
//開始監(jiān)控,返回活躍連接
void Poll(std::vector<Channel*> *active) {
// int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout)
int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);
if (nfds < 0) {
if (errno == EINTR) {
return ;
}
ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno));
abort();//退出程序
}
for (int i = 0; i < nfds; i++) {
auto it = _channels.find(_evs[i].data.fd);
assert(it != _channels.end());
it->second->SetREvents(_evs[i].events);//設置實際就緒的事件
active->push_back(it->second);
}
return;
}
};
14.定時任務管理TimerWheel模塊實現(xiàn)
定時器模塊的整合:
timefd:實現(xiàn)內(nèi)核每隔一段事件,給進程一次超時時間
timewheel:實現(xiàn)每次執(zhí)行Runtimetask,都可以執(zhí)行一波到期的定時任務
要實現(xiàn)-個完整的秒級定時器,就需要將這兩個功能整合到一起,timefd設置為海秒鐘觸發(fā)一次定時事件, 事件被觸發(fā),則運行一次timewheel的runtimetask執(zhí)行一下所有過期定時任務
代碼實現(xiàn) :
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask{
private:
uint64_t _id; // 定時器任務對象ID
uint32_t _timeout; //定時任務的超時時間
bool _canceled; // false-表示沒有被取消, true-表示被取消
TaskFunc _task_cb; //定時器對象要執(zhí)行的定時任務
ReleaseFunc _release; //用于刪除TimerWheel中保存的定時器對象信息
public:
TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb):
_id(id), _timeout(delay), _task_cb(cb), _canceled(false) {}
~TimerTask() {
if (_canceled == false) _task_cb();
_release();
}
void Cancel() { _canceled = true; }
void SetRelease(const ReleaseFunc &cb) { _release = cb; }
uint32_t DelayTime() { return _timeout; }
};
class TimerWheel
{
private:
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
int _tick; //當前的秒針,走到哪里釋放哪里,釋放哪里,就相當于執(zhí)行哪里的任務
int _capacity; //表盤最大數(shù)量---其實就是最大延遲時間
std::vector<std::vector<PtrTask>> _wheel;
std::unordered_map<uint64_t, WeakTask> _timers;
EventLoop *_loop;
int _timerfd;//定時器描述符--可讀事件回調(diào)就是讀取計數(shù)器,執(zhí)行定時任務
std::unique_ptr<Channel> _timer_channel;
private:
void RemoveTimer(uint64_t id) {
auto it = _timers.find(id);
if (it != _timers.end()) {
_timers.erase(it);
}
}
static int CreateTimerfd() {
int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
if (timerfd < 0) {
ERR_LOG("TIMERFD CREATE FAILED!");
abort();
}
//int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old);
struct itimerspec itime;
itime.it_value.tv_sec = 1;
itime.it_value.tv_nsec = 0;//第一次超時時間為1s后
itime.it_interval.tv_sec = 1;
itime.it_interval.tv_nsec = 0; //第一次超時后,每次超時的間隔時
timerfd_settime(timerfd, 0, &itime, NULL);
return timerfd;
}
int ReadTimefd() {
uint64_t times;
//有可能因為其他描述符的事件處理花費事件比較長,然后在處理定時器描述符事件的時候,有可能就已經(jīng)超時了很多次
//read讀取到的數(shù)據(jù)times就是從上一次read之后超時的次數(shù)
int ret = read(_timerfd, ×, 8);
if (ret < 0) {
ERR_LOG("READ TIMEFD FAILED!");
abort();
}
return times;
}
//這個函數(shù)應該每秒鐘被執(zhí)行一次,相當于秒針向后走了一步
void RunTimerTask() {
_tick = (_tick + 1) % _capacity;
_wheel[_tick].clear();//清空指定位置的數(shù)組,就會把數(shù)組中保存的所有管理定時器對象的shared_ptr釋放掉
}
void OnTime() {
//根據(jù)實際超時的次數(shù),執(zhí)行對應的超時任務
int times = ReadTimefd();
for (int i = 0; i < times; i++) {
RunTimerTask();
}
}
void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) {
PtrTask pt(new TimerTask(id, delay, cb));
pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
_timers[id] = WeakTask(pt);
}
void TimerRefreshInLoop(uint64_t id) {
//通過保存的定時器對象的weak_ptr構造一個shared_ptr出來,添加到輪子中
auto it = _timers.find(id);
if (it == _timers.end()) {
return;//沒找著定時任務,沒法刷新,沒法延遲
}
PtrTask pt = it->second.lock();//lock獲取weak_ptr管理的對象對應的shared_ptr
int delay = pt->DelayTime();
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);
}
void TimerCancelInLoop(uint64_t id) {
auto it = _timers.find(id);
if (it == _timers.end()) {
return;//沒找著定時任務,沒法刷新,沒法延遲
}
PtrTask pt = it->second.lock();
if (pt) pt->Cancel();
}
public:
TimerWheel(EventLoop *loop):_capacity(60), _tick(0), _wheel(_capacity), _loop(loop),
_timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd)) {
_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));
_timer_channel->EnableRead();//啟動讀事件監(jiān)控
}
/*定時器中有個_timers成員,定時器信息的操作有可能在多線程中進行,因此需要考慮線程安全問題*/
/*如果不想加鎖,那就把對定期的所有操作,都放到一個線程中進行*/
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);
//刷新/延遲定時任務
void TimerRefresh(uint64_t id);
void TimerCancel(uint64_t id);
/*這個接口存在線程安全問題--這個接口實際上不能被外界使用者調(diào)用,只能在模塊內(nèi),在對應的EventLoop線程內(nèi)執(zhí)行*/
bool HasTimer(uint64_t id) {
auto it = _timers.find(id);
if (it == _timers.end()) {
return false;
}
return true;
}
};
15.Reactor-EventLoop線程池模塊實現(xiàn)
15.1eventfd介紹
eventfd:一種事件通知機制
創(chuàng)建一個描述符用于實現(xiàn)事件通知
eventfd本質(zhì)在內(nèi)核離邊管理的就是一個計數(shù)器
創(chuàng)建eventfd就會在內(nèi)核中創(chuàng)建一個計數(shù)器結構
每當向eventfd中寫入一個數(shù)值—用于表示事件通知次數(shù)
可以使用read進行數(shù)據(jù)的讀取,讀取到的數(shù)據(jù)就是通知的次數(shù)
假設每次給eventfd中寫入一個1,就表示通知了一次,連續(xù)寫了三次之后,再去read讀取出來的數(shù)字就是3,讀取之后計數(shù)清0
用處:在EventLoop模塊中實現(xiàn)線程間通知機制
#include < sys/eventfd.h>
int eventfd(unsigned int initval,int flags);?
功能:創(chuàng)建一個eventfd對象,實現(xiàn)事件通知
參數(shù):
initval :計數(shù)初值
flags:
EFD_CLOEXEC—禁止進程復制
EFD_NOBLOACK—啟動非阻塞屬性
返回值:返回一個文件描述符用于操作
eventfd也是通過read/write/close進行操作的
注意點: read&write進行IO的時候數(shù)據(jù)只能是一個8字節(jié)的數(shù)據(jù)?
代碼演示:?
#include <stdio.h>
#include <stdint.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/eventfd.h>
int main()
{
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0) {
perror("eventfd failed!!");
return -1;
}
uint64_t val = 1;
write(efd, &val, sizeof(val));
write(efd, &val, sizeof(val));
write(efd, &val, sizeof(val));
uint64_t res = 0;
read(efd, &res, sizeof(res));
printf("%ld\n", res);
close(efd);
return 0;
}
運行截圖:?
15.2EventLoop模塊
進行事件監(jiān)控,以及事件處理的模塊
關鍵點:這個模塊與線程是一一對應關聯(lián)的
監(jiān)控了一個連接,而這個連接-旦就緒,就要進行事件處理。
但是如果這個描述符,在多個線程中都觸發(fā)了事件,進行處理,就會存在線程安全的問題
因此我們需要將-個連接的事件監(jiān)控,以及連接事件處理,以及其它操作都放在同一個線程中進行
如何保證一個連接的所有操作都在EventLopp對應的線程中:
解決方案:給EventLoop模塊中,添加一個任務隊列,對連接的所有操作,都進行-次封裝,將對連接
的操作并不直接執(zhí)行,而是當作任務添加到任務隊列中
EventLoop處理流程:
1.在線程中對描述符進行事件監(jiān)控
2.有描述符就緒則對描述符進行事件處理(如何玩保證處理回調(diào)函數(shù)中的操作都在線程中)
3.所有的就緒事件處理完了,這時候再去將任務隊列中的任務執(zhí)行
1.事件監(jiān)控
使用Poller模塊
有事件就緒則進行事件處理
2.執(zhí)行任務隊列中的任務
一個線程安全的任務隊列
注意點:
因為有可能因為等待描述符I0事件就緒,導致執(zhí)行流流程阻塞,這時候任務隊列中的任務將得不到執(zhí)行因此得有一個事件通知的東西,能夠喚醒事件監(jiān)控的阻塞
class EventLoop
{
private:
using Functor = std::function<void()>;
std::thread::id _thread_id;//線程ID
int _event_fd;//eventfd喚醒IO事件監(jiān)控有可能導致的阻塞
std::unique_ptr<Channel> _event_channel;
Poller _poller;//進行所有描述符的事件監(jiān)控
std::vector<Functor> _tasks;//任務池
std::mutex _mutex;//實現(xiàn)任務池操作的線程安全
TimerWheel _timer_wheel;//定時器模塊
public:
//執(zhí)行任務池中的所有任務
void RunAllTask() {
std::vector<Functor> functor;
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.swap(functor);
}
for (auto &f : functor) {
f();
}
return ;
}
static int CreateEventFd() {
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0) {
ERR_LOG("CREATE EVENTFD FAILED!!");
abort();//讓程序異常退出
}
return efd;
}
void ReadEventfd() {
uint64_t res = 0;
int ret = read(_event_fd, &res, sizeof(res));
if (ret < 0) {
//EINTR -- 被信號打斷; EAGAIN -- 表示無數(shù)據(jù)可讀
if (errno == EINTR || errno == EAGAIN) {
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return ;
}
void WeakUpEventFd() {
uint64_t val = 1;
int ret = write(_event_fd, &val, sizeof(val));
if (ret < 0) {
if (errno == EINTR) {
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return ;
}
public:
EventLoop():_thread_id(std::this_thread::get_id()),
_event_fd(CreateEventFd()),
_event_channel(new Channel(this, _event_fd)),
_timer_wheel(this) {
//給eventfd添加可讀事件回調(diào)函數(shù),讀取eventfd事件通知次數(shù)
_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));
//啟動eventfd的讀事件監(jiān)控
_event_channel->EnableRead();
}
//三步走--事件監(jiān)控-》就緒事件處理-》執(zhí)行任務
void Start() {
while(1) {
//1. 事件監(jiān)控,
std::vector<Channel *> actives;
_poller.Poll(&actives);
//2. 事件處理。
for (auto &channel : actives) {
channel->HandleEvent();
}
//3. 執(zhí)行任務
RunAllTask();
}
}
//用于判斷當前線程是否是EventLoop對應的線程;
bool IsInLoop() {
return (_thread_id == std::this_thread::get_id());
}
void AssertInLoop() {
assert(_thread_id == std::this_thread::get_id());
}
//判斷將要執(zhí)行的任務是否處于當前線程中,如果是則執(zhí)行,不是則壓入隊列。
void RunInLoop(const Functor &cb) {
if (IsInLoop()) {
return cb();
}
return QueueInLoop(cb);
}
//將操作壓入任務池
void QueueInLoop(const Functor &cb) {
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.push_back(cb);
}
//喚醒有可能因為沒有事件就緒,而導致的epoll阻塞;
//其實就是給eventfd寫入一個數(shù)據(jù),eventfd就會觸發(fā)可讀事件
WeakUpEventFd();
}
//添加/修改描述符的事件監(jiān)控
void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }
//移除描述符的監(jiān)控
void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
{ return _timer_wheel.TimerAdd(id, delay, cb); }
void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }
void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }
bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};
void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }
void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
}
//刷新/延遲定時任務
void TimerWheel::TimerRefresh(uint64_t id) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void TimerWheel::TimerCancel(uint64_t id) {
_loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));
}
15.3EventLoop模塊調(diào)用關系圖
15.4EventLoop模塊和線程整合起來
EventLoop模塊與線程是對應的
EventLoop模塊實例化的對象,在構造的時候就會初始化_ _thread_ id
后邊當運行一個操作的時候判斷當前是否運行在EventLoop模塊對應的線程中,就是將線程ID與EventLoop模塊中的thread_ id進行比較,相同就表示在同一個線程,不同就表示當前運行線程并不是EventLoop線程
含義: EventLoop模塊在實例化對象的時候,必須在線程內(nèi)部
EventLoop實例化對象時會設置自己的thread_ id,如果我們先創(chuàng)建了多個EventLoop對象,然后創(chuàng)建了多個線程,將各個線程的id,重新給EventLoop進行設置
存在的問題:在構造EventLoop對象, 到設置新的thread id期間將是不可控的
因此我們必先創(chuàng)建線程,人后在線程的入口函數(shù)中,實例化EventLoop對象
構造一個新的模塊: LoopThread
這個模塊的功能:將EventLoop 與thread整合到一起
思想:
1.創(chuàng)建線程
2.在線程中實例化EventLoop對象
功能:可以向外部返回所實例化的EventLoop
代碼編寫
class LoopThread
{
private:
/*用于實現(xiàn)_loop獲取的同步關系,避免線程創(chuàng)建了,但是_loop還沒有實例化之前去獲取_loop*/
std::mutex _mutex; // 互斥鎖
std::condition_variable _cond; // 條件變量
EventLoop *_loop; // EventLoop指針變量,這個對象需要在線程內(nèi)實例化
std::thread _thread; // EventLoop對應的線程
private:
/*實例化 EventLoop 對象,喚醒_cond上有可能阻塞的線程,并且開始運行EventLoop模塊的功能*/
void ThreadEntry() {
EventLoop loop;
{
std::unique_lock<std::mutex> lock(_mutex);//加鎖
_loop = &loop;
_cond.notify_all();
}
loop.Start();
}
public:
/*創(chuàng)建線程,設定線程入口函數(shù)*/
LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}
/*返回當前線程關聯(lián)的EventLoop對象指針*/
EventLoop *GetLoop() {
EventLoop *loop = NULL;
{
std::unique_lock<std::mutex> lock(_mutex);//加鎖
_cond.wait(lock, [&](){ return _loop != NULL; });//loop為NULL就一直阻塞
loop = _loop;
}
return loop;
}
};
15.5LoopThread線程池實現(xiàn)
針對LoopThread設計一個線程池:
LoopThreadPoll模塊:對所有的LoopThread進行管理及分配
功能:
1.線程數(shù)量可配置(0個或者多個)
注意事項:在服務器中,主從Reactor模型是主線程只負責新連接獲取,從屬線程負責新連接的事件監(jiān)控及處理因此當前的線程池,有可能從屬線程會數(shù)量為0,也就是實現(xiàn)單Reactor服務器, 一個線程既負責獲取新連接負責連接的處理
2.對所有的線程進行管理,其實就是管理0個或多個LoopThread對象
3.提供線程分配的功能
當主線程獲取了一個新連接,需要將新連接掛到從屬線程上進行事件監(jiān)控及處理
假設有0個從屬線程,則直接分配給主線程的EventLoop, 進行處理
假設有多個從屬線程,則采用RR輪轉思想,進行線程的分配(將對應線程的EventLoop獲取到,設置給對應的Connection)
class LoopThreadPool
{
private:
int _thread_count; //從屬線程的數(shù)量
int _next_idx;
EventLoop *_baseloop;
std::vector<LoopThread*> _threads;
std::vector<EventLoop *> _loops;
public:
LoopThreadPool(EventLoop *baseloop):_thread_count(0), _next_idx(0), _baseloop(baseloop) {}
void SetThreadCount(int count) { _thread_count = count; }
void Create() {
if (_thread_count > 0) {
_threads.resize(_thread_count);
_loops.resize(_thread_count);
for (int i = 0; i < _thread_count; i++) {
_threads[i] = new LoopThread();
_loops[i] = _threads[i]->GetLoop();
}
}
return ;
}
EventLoop *NextLoop() {
if (_thread_count == 0) {
return _baseloop;
}
_next_idx = (_next_idx + 1) % _thread_count;
return _loops[_next_idx];
}
};
16.Connection模塊實現(xiàn)
目的:對連接進行全方位的管理,對通信連接的所有操作都是通過這個模塊提供的功能完成的
管理:
1.套接字的管理,能夠進行套接字的操作
2.連接事件的管理,可讀,可寫,錯誤,掛斷,任意
3.緩沖區(qū)的管理,便于socket數(shù)據(jù)的接受和發(fā)送
4.協(xié)議嚇文的管理,記錄請求數(shù)據(jù)的處理過程
5.回調(diào)函數(shù)的管理
因為連接接受到數(shù)據(jù)之后該如何處理,要由用戶決定,因此必須有業(yè)務處理回調(diào)函數(shù)
一個連接建立成功后,該如何處理,用戶決定,因此必須有關閉連接回調(diào)函數(shù)
任意事件的產(chǎn)生,有沒有某些處理,由戶決定,因此必須有任意事件的回調(diào)函數(shù)
功能:
1.發(fā)送數(shù)據(jù)--給用戶提供的發(fā)送數(shù)據(jù)接口,钚是真正的發(fā)送接口,而只是把數(shù)據(jù)放到發(fā)送緩沖區(qū),然后啟動寫事件監(jiān)控
2.關閉連接--給用戶提供關閉連接接口,應該在實際釋放連接之前,看看輸入輸出緩沖區(qū)否有數(shù)據(jù)處理
3.啟動非活躍連接的超時銷毀功能
4.取消非活躍連接的超時功能
5.協(xié)議切換--一個連接接受數(shù)據(jù)后如何進行業(yè)務處理,取決于上下文,以及數(shù)據(jù)的業(yè)務處理回調(diào)函數(shù)Connection模塊是對連接的管理模塊,對于連接的所有操作都是通過這個模塊完成的
場景:對連接進行操作的時候,但是連接已經(jīng)釋放,導致內(nèi)存訪問錯誤,最終程序崩潰
解決方案:使用智能指針shared_ ptr對Connection對象進行管理,這樣就能保證任意地方對
Connection對象進行操作的時候,保存了一份shared_ ptr, 因此就算其它地方進行釋放操作
也只是對shared_ ptr的計數(shù)器-1,而不會導致Connection的實際釋放
class Any
{
private:
class holder
{
public:
virtual ~holder() {}
virtual const std::type_info& type() = 0;
virtual holder *clone() = 0;
};
template<class T>
class placeholder: public holder
{
public:
placeholder(const T &val): _val(val) {}
// 獲取子類對象保存的數(shù)據(jù)類型
virtual const std::type_info& type() { return typeid(T); }
// 針對當前的對象自身,克隆出一個新的子類對象
virtual holder *clone() { return new placeholder(_val); }
public:
T _val;
};
holder *_content;
public:
Any():_content(NULL) {}
template<class T>
Any(const T &val):_content(new placeholder<T>(val)) {}
Any(const Any &other):_content(other._content ? other._content->clone() : NULL) {}
~Any() { delete _content; }
Any &swap(Any &other) {
std::swap(_content, other._content);
return *this;
}
// 返回子類對象保存的數(shù)據(jù)的指針
template<class T>
T *get() {
//想要獲取的數(shù)據(jù)類型,必須和保存的數(shù)據(jù)類型一致
assert(typeid(T) == _content->type());
return &((placeholder<T>*)_content)->_val;
}
//賦值運算符的重載函數(shù)
template<class T>
Any& operator=(const T &val) {
//為val構造一個臨時的通用容器,然后與當前容器自身進行指針交換,臨時對象釋放的時候,原先保存的數(shù)據(jù)也就被釋放
Any(val).swap(*this);
return *this;
}
Any& operator=(const Any &other) {
Any(other).swap(*this);
return *this;
}
};
class Connection;
//DISCONECTED -- 連接關閉狀態(tài); CONNECTING -- 連接建立成功-待處理狀態(tài)
//CONNECTED -- 連接建立完成,各種設置已完成,可以通信的狀態(tài); DISCONNECTING -- 待關閉狀態(tài)
typedef enum { DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING}ConnStatu;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection>
{
private:
uint64_t _conn_id; // 連接的唯一ID,便于連接的管理和查找
//uint64_t _timer_id; //定時器ID,必須是唯一的,這塊為了簡化操作使用conn_id作為定時器ID
int _sockfd; // 連接關聯(lián)的文件描述符
bool _enable_inactive_release; // 連接是否啟動非活躍銷毀的判斷標志,默認為false
EventLoop *_loop; // 連接所關聯(lián)的一個EventLoop
ConnStatu _statu; // 連接狀態(tài)
Socket _socket; // 套接字操作管理
Channel _channel; // 連接的事件管理
Buffer _in_buffer; // 輸入緩沖區(qū)---存放從socket中讀取到的數(shù)據(jù)
Buffer _out_buffer; // 輸出緩沖區(qū)---存放要發(fā)送給對端的數(shù)據(jù)
Any _context; // 請求的接收處理上下文
/*這四個回調(diào)函數(shù),是讓服務器模塊來設置的(其實服務器模塊的處理回調(diào)也是組件使用者設置的)*/
/*換句話說,這幾個回調(diào)都是組件使用者使用的*/
using ConnectedCallback = std::function<void(const PtrConnection&)>;
using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;
using ClosedCallback = std::function<void(const PtrConnection&)>;
using AnyEventCallback = std::function<void(const PtrConnection&)>;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _closed_callback;
AnyEventCallback _event_callback;
/*組件內(nèi)的連接關閉回調(diào)--組件內(nèi)設置的,因為服務器組件內(nèi)會把所有的連接管理起來,一旦某個連接要關閉*/
/*就應該從管理的地方移除掉自己的信息*/
ClosedCallback _server_closed_callback;
private:
/*五個channel的事件回調(diào)函數(shù)*/
//描述符可讀事件觸發(fā)后調(diào)用的函數(shù),接收socket數(shù)據(jù)放到接收緩沖區(qū)中,然后調(diào)用_message_callback
void HandleRead() {
//1. 接收socket的數(shù)據(jù),放到緩沖區(qū)
char buf[65536];
ssize_t ret = _socket.NonBlockRecv(buf, 65535);
if (ret < 0) {
//出錯了,不能直接關閉連接
return ShutdownInLoop();
}
//這里的等于0表示的是沒有讀取到數(shù)據(jù),而并不是連接斷開了,連接斷開返回的是-1
//將數(shù)據(jù)放入輸入緩沖區(qū),寫入之后順便將寫偏移向后移動
_in_buffer.WriteAndPush(buf, ret);
//2. 調(diào)用message_callback進行業(yè)務處理
if (_in_buffer.ReadAbleSize() > 0) {
//shared_from_this--從當前對象自身獲取自身的shared_ptr管理對象
return _message_callback(shared_from_this(), &_in_buffer);
}
}
//描述符可寫事件觸發(fā)后調(diào)用的函數(shù),將發(fā)送緩沖區(qū)中的數(shù)據(jù)進行發(fā)送
void HandleWrite() {
//_out_buffer中保存的數(shù)據(jù)就是要發(fā)送的數(shù)據(jù)
ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());
if (ret < 0) {
//發(fā)送錯誤就該關閉連接了,
if (_in_buffer.ReadAbleSize() > 0) {
_message_callback(shared_from_this(), &_in_buffer);
}
return Release();//這時候就是實際的關閉釋放操作了。
}
_out_buffer.MoveReadOffset(ret);//千萬不要忘了,將讀偏移向后移動
if (_out_buffer.ReadAbleSize() == 0) {
_channel.DisableWrite();// 沒有數(shù)據(jù)待發(fā)送了,關閉寫事件監(jiān)控
//如果當前是連接待關閉狀態(tài),則有數(shù)據(jù),發(fā)送完數(shù)據(jù)釋放連接,沒有數(shù)據(jù)則直接釋放
if (_statu == DISCONNECTING) {
return Release();
}
}
return;
}
//描述符觸發(fā)掛斷事件
void HandleClose() {
/*一旦連接掛斷了,套接字就什么都干不了了,因此有數(shù)據(jù)待處理就處理一下,完畢關閉連接*/
if (_in_buffer.ReadAbleSize() > 0) {
_message_callback(shared_from_this(), &_in_buffer);
}
return Release();
}
//描述符觸發(fā)出錯事件
void HandleError() {
return HandleClose();
}
//描述符觸發(fā)任意事件: 1. 刷新連接的活躍度--延遲定時銷毀任務; 2. 調(diào)用組件使用者的任意事件回調(diào)
void HandleEvent() {
if (_enable_inactive_release == true) { _loop->TimerRefresh(_conn_id); }
if (_event_callback) { _event_callback(shared_from_this()); }
}
//連接獲取之后,所處的狀態(tài)下要進行各種設置(啟動讀監(jiān)控,調(diào)用回調(diào)函數(shù))
void EstablishedInLoop() {
// 1. 修改連接狀態(tài); 2. 啟動讀事件監(jiān)控; 3. 調(diào)用回調(diào)函數(shù)
assert(_statu == CONNECTING);//當前的狀態(tài)必須一定是上層的半連接狀態(tài)
_statu = CONNECTED;//當前函數(shù)執(zhí)行完畢,則連接進入已完成連接狀態(tài)
// 一旦啟動讀事件監(jiān)控就有可能會立即觸發(fā)讀事件,如果這時候啟動了非活躍連接銷毀
_channel.EnableRead();
if (_connected_callback) _connected_callback(shared_from_this());
}
//這個接口才是實際的釋放接口
void ReleaseInLoop() {
//1. 修改連接狀態(tài),將其置為DISCONNECTED
_statu = DISCONNECTED;
//2. 移除連接的事件監(jiān)控
_channel.Remove();
//3. 關閉描述符
_socket.Close();
//4. 如果當前定時器隊列中還有定時銷毀任務,則取消任務
if (_loop->HasTimer(_conn_id)) CancelInactiveReleaseInLoop();
//5. 調(diào)用關閉回調(diào)函數(shù),避免先移除服務器管理的連接信息導致Connection被釋放,再去處理會出錯,因此先調(diào)用用戶的回調(diào)函數(shù)
if (_closed_callback) _closed_callback(shared_from_this());
//移除服務器內(nèi)部管理的連接信息
if (_server_closed_callback) _server_closed_callback(shared_from_this());
}
//這個接口并不是實際的發(fā)送接口,而只是把數(shù)據(jù)放到了發(fā)送緩沖區(qū),啟動了可寫事件監(jiān)控
void SendInLoop(Buffer &buf) {
if (_statu == DISCONNECTED) return ;
_out_buffer.WriteBufferAndPush(buf);
if (_channel.WriteAble() == false) {
_channel.EnableWrite();
}
}
//這個關閉操作并非實際的連接釋放操作,需要判斷還有沒有數(shù)據(jù)待處理,待發(fā)送
void ShutdownInLoop() {
_statu = DISCONNECTING;// 設置連接為半關閉狀態(tài)
if (_in_buffer.ReadAbleSize() > 0) {
if (_message_callback) _message_callback(shared_from_this(), &_in_buffer);
}
//要么就是寫入數(shù)據(jù)的時候出錯關閉,要么就是沒有待發(fā)送數(shù)據(jù),直接關閉
if (_out_buffer.ReadAbleSize() > 0) {
if (_channel.WriteAble() == false) {
_channel.EnableWrite();
}
}
if (_out_buffer.ReadAbleSize() == 0) {
Release();
}
}
//啟動非活躍連接超時釋放規(guī)則
void EnableInactiveReleaseInLoop(int sec) {
//1. 將判斷標志 _enable_inactive_release 置為true
_enable_inactive_release = true;
//2. 如果當前定時銷毀任務已經(jīng)存在,那就刷新延遲一下即可
if (_loop->HasTimer(_conn_id)) {
return _loop->TimerRefresh(_conn_id);
}
//3. 如果不存在定時銷毀任務,則新增
_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));
}
void CancelInactiveReleaseInLoop() {
_enable_inactive_release = false;
if (_loop->HasTimer(_conn_id)) {
_loop->TimerCancel(_conn_id);
}
}
void UpgradeInLoop(const Any &context,
const ConnectedCallback &conn,
const MessageCallback &msg,
const ClosedCallback &closed,
const AnyEventCallback &event) {
_context = context;
_connected_callback = conn;
_message_callback = msg;
_closed_callback = closed;
_event_callback = event;
}
public:
Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd),
_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),
_channel(loop, _sockfd) {
_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));
_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));
_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));
_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));
_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
}
~Connection() { DBG_LOG("RELEASE CONNECTION:%p", this); }
//獲取管理的文件描述符
int Fd() { return _sockfd; }
//獲取連接ID
int Id() { return _conn_id; }
//是否處于CONNECTED狀態(tài)
bool Connected() { return (_statu == CONNECTED); }
//設置上下文--連接建立完成時進行調(diào)用
void SetContext(const Any &context) { _context = context; }
//獲取上下文,返回的是指針
Any *GetContext() { return &_context; }
void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }
void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }
void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }
void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }
void SetSrvClosedCallback(const ClosedCallback&cb) { _server_closed_callback = cb; }
//連接建立就緒后,進行channel回調(diào)設置,啟動讀監(jiān)控,調(diào)用_connected_callback
void Established() {
_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
}
//發(fā)送數(shù)據(jù),將數(shù)據(jù)放到發(fā)送緩沖區(qū),啟動寫事件監(jiān)控
void Send(const char *data, size_t len) {
//外界傳入的data,可能是個臨時的空間,我們現(xiàn)在只是把發(fā)送操作壓入了任務池,有可能并沒有被立即執(zhí)行
//因此有可能執(zhí)行的時候,data指向的空間有可能已經(jīng)被釋放了。
Buffer buf;
buf.WriteAndPush(data, len);
_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));
}
//提供給組件使用者的關閉接口--并不實際關閉,需要判斷有沒有數(shù)據(jù)待處理
void Shutdown() {
_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));
}
void Release() {
_loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this));
}
//啟動非活躍銷毀,并定義多長時間無通信就是非活躍,添加定時任務
void EnableInactiveRelease(int sec) {
_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));
}
//取消非活躍銷毀
void CancelInactiveRelease() {
_loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));
}
//切換協(xié)議---重置上下文以及階段性回調(diào)處理函數(shù) -- 而是這個接口必須在EventLoop線程中立即執(zhí)行
//防備新的事件觸發(fā)后,處理的時候,切換任務還沒有被執(zhí)行--會導致數(shù)據(jù)使用原協(xié)議處理了。
void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,
const ClosedCallback &closed, const AnyEventCallback &event) {
_loop->AssertInLoop();
_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));
}
};
17.監(jiān)聽描述符管理Acceptor模塊實現(xiàn)
Acceptor模塊:對監(jiān)聽套接字進行管理
1.創(chuàng)建一個監(jiān)聽套接字
2.啟動讀事件監(jiān)控
3.事件觸發(fā)后,獲取新連接
4.調(diào)用新連接獲取成功后的回調(diào)函數(shù)
為新連接創(chuàng)建Connection進行管理(這一步懷是 Acceptor模塊操作,應該是服務器模塊)
因為Acceptor模塊只進行監(jiān)聽連接的管理,因此獲取到新連接的描述符之后,對于新連接描述符如何處理其實并不關心,對于新連接如何處理,應該是服務器模塊來管理的
服務器模塊,實現(xiàn)了一個對于新連接描述符處理的函數(shù),將這個函數(shù)設置給Acceptor模塊中的回調(diào)函數(shù)
class Acceptor
{
private:
Socket _socket;//用于創(chuàng)建監(jiān)聽套接字
EventLoop *_loop; //用于對監(jiān)聽套接字進行事件監(jiān)控
Channel _channel; //用于對監(jiān)聽套接字進行事件管理
using AcceptCallback = std::function<void(int)>;
AcceptCallback _accept_callback;
private:
/*監(jiān)聽套接字的讀事件回調(diào)處理函數(shù)---獲取新連接,調(diào)用_accept_callback函數(shù)進行新連接處理*/
void HandleRead() {
int newfd = _socket.Accept();
if (newfd < 0) {
return ;
}
if (_accept_callback) _accept_callback(newfd);
}
int CreateServer(int port) {
bool ret = _socket.CreateServer(port);
assert(ret == true);
return _socket.Fd();
}
public:
/*不能將啟動讀事件監(jiān)控,放到構造函數(shù)中,必須在設置回調(diào)函數(shù)后,再去啟動*/
/*否則有可能造成啟動監(jiān)控后,立即有事件,處理的時候,回調(diào)函數(shù)還沒設置:新連接得不到處理,且資源泄漏*/
Acceptor(EventLoop *loop, int port): _socket(CreateServer(port)), _loop(loop),
_channel(loop, _socket.Fd()) {
_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
}
void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }
void Listen() { _channel.EnableRead(); }
};
18.TcpServer模塊實現(xiàn)
對所有模塊的整合,通過TcpServer模塊實例化對象,可以非常簡單的完成一個服務器的搭建
管理:
1.Acceptor對象,創(chuàng)建一個監(jiān)聽套接字
2.EventLoop對象,baseloop對象,實現(xiàn)對監(jiān)聽套接字的事件監(jiān)控
3.std::unordered_ map <uint64_ t,PtrConnection>_ conns,實現(xiàn)對所有新建連接的管理
4.LoopThreadPool對象,創(chuàng)建loop線程池, 對新建連接進行事件監(jiān)控及處理
功能:
1.設置從屬線程池數(shù)量
2.啟動服務器
3.設置各種回調(diào)函數(shù)(連接建立完成,消息,關閉,任意),用戶設置給TcpServer,TcpServer設 置給獲取的新連接
4.是否啟動非活躍連接超時銷毀功能
5.添加定時任務功能
流程:
1.在TcpServer中實例化一個Acceptor對象, 以及-個EventLoop對象(baseloop)
2.將Acceptor掛到baseloop上進行事件監(jiān)控
3.一旦Acceptor對象就緒了可讀事件,則執(zhí)行讀事件回到函數(shù)獲取新建連接
4.對新連接,創(chuàng)建一個Connection進行管理
5.對連接對應的Connecton設置功能回調(diào)(連接完成回調(diào),消息回調(diào),關閉回調(diào),任意事件回調(diào))6.啟動Connection的非活躍連接的超時銷毀規(guī)則
7.將新連接對應的Connection掛到L oopThreadPool中的從屬線程對應的EventLoop中進行事件監(jiān)控
8.一旦Connection對應的連接就緒了可讀事件,則這個時候執(zhí)行事件回調(diào)函數(shù),讀取數(shù)據(jù),讀取完畢
后調(diào)用TcpServer設置的消息回調(diào)
class TcpServer
{
private:
uint64_t _next_id; //這是一個自動增長的連接ID,
int _port;
int _timeout; //這是非活躍連接的統(tǒng)計時間---多長時間無通信就是非活躍連接
bool _enable_inactive_release;//是否啟動了非活躍連接超時銷毀的判斷標志
EventLoop _baseloop; //這是主線程的EventLoop對象,負責監(jiān)聽事件的處理
Acceptor _acceptor; //這是監(jiān)聽套接字的管理對象
LoopThreadPool _pool; //這是從屬EventLoop線程池
std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有連接對應的shared_ptr對象
using ConnectedCallback = std::function<void(const PtrConnection&)>;
using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;
using ClosedCallback = std::function<void(const PtrConnection&)>;
using AnyEventCallback = std::function<void(const PtrConnection&)>;
using Functor = std::function<void()>;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _closed_callback;
AnyEventCallback _event_callback;
private:
void RunAfterInLoop(const Functor &task, int delay) {
_next_id++;
_baseloop.TimerAdd(_next_id, delay, task);
}
//為新連接構造一個Connection進行管理
void NewConnection(int fd) {
_next_id++;
PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));
conn->SetMessageCallback(_message_callback);
conn->SetClosedCallback(_closed_callback);
conn->SetConnectedCallback(_connected_callback);
conn->SetAnyEventCallback(_event_callback);
conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);//啟動非活躍超時銷毀
conn->Established();//就緒初始化
_conns.insert(std::make_pair(_next_id, conn));
}
void RemoveConnectionInLoop(const PtrConnection &conn) {
int id = conn->Id();
auto it = _conns.find(id);
if (it != _conns.end()) {
_conns.erase(it);
}
}
//從管理Connection的_conns中移除連接信息
void RemoveConnection(const PtrConnection &conn) {
_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));
}
public:
TcpServer(int port):
_port(port),
_next_id(0),
_enable_inactive_release(false),
_acceptor(&_baseloop, port),
_pool(&_baseloop) {
_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
_acceptor.Listen();//將監(jiān)聽套接字掛到baseloop上
}
void SetThreadCount(int count) { return _pool.SetThreadCount(count); }
void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }
void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }
void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }
void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }
void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; }
//用于添加一個定時任務
void RunAfter(const Functor &task, int delay) {
_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));
}
void Start() { _pool.Create(); _baseloop.Start(); }
};
19.基于TcpServer實現(xiàn)回顯服務器
#include "../source/server.hpp"
class EchoServer
{
private:
TcpServer _server;
private:
void OnConnected(const PtrConnection &conn)
{
DBG_LOG("NEW CONNECTION:%p", conn.get());
}
void OnClosed(const PtrConnection &conn)
{
DBG_LOG("CLOSE CONNECTION:%p", conn.get());
}
void OnMessage(const PtrConnection &conn, Buffer *buf)
{
conn->Send(buf->ReadPosition(), buf->ReadAbleSize());
buf->MoveReadOffset(buf->ReadAbleSize());
conn->Shutdown();
}
public:
EchoServer(int port) : _server(port)
{
_server.SetThreadCount(2);
_server.EnableInactiveRelease(10);
_server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1));
_server.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1));
_server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
}
void Start() { _server.Start(); }
};
#include "echo.hpp"
int main()
{
EchoServer server(8081);
server.Start();
return 0;
}
19.1EchoServer模塊關系圖
20.HTTP協(xié)議?持模塊實現(xiàn)
20.1Util實??具類實現(xiàn)
std::unordered_map<int, std::string> _statu_msg = {
{100, "Continue"},
{101, "Switching Protocol"},
{102, "Processing"},
{103, "Early Hints"},
{200, "OK"},
{201, "Created"},
{202, "Accepted"},
{203, "Non-Authoritative Information"},
{204, "No Content"},
{205, "Reset Content"},
{206, "Partial Content"},
{207, "Multi-Status"},
{208, "Already Reported"},
{226, "IM Used"},
{300, "Multiple Choice"},
{301, "Moved Permanently"},
{302, "Found"},
{303, "See Other"},
{304, "Not Modified"},
{305, "Use Proxy"},
{306, "unused"},
{307, "Temporary Redirect"},
{308, "Permanent Redirect"},
{400, "Bad Request"},
{401, "Unauthorized"},
{402, "Payment Required"},
{403, "Forbidden"},
{404, "Not Found"},
{405, "Method Not Allowed"},
{406, "Not Acceptable"},
{407, "Proxy Authentication Required"},
{408, "Request Timeout"},
{409, "Conflict"},
{410, "Gone"},
{411, "Length Required"},
{412, "Precondition Failed"},
{413, "Payload Too Large"},
{414, "URI Too Long"},
{415, "Unsupported Media Type"},
{416, "Range Not Satisfiable"},
{417, "Expectation Failed"},
{418, "I'm a teapot"},
{421, "Misdirected Request"},
{422, "Unprocessable Entity"},
{423, "Locked"},
{424, "Failed Dependency"},
{425, "Too Early"},
{426, "Upgrade Required"},
{428, "Precondition Required"},
{429, "Too Many Requests"},
{431, "Request Header Fields Too Large"},
{451, "Unavailable For Legal Reasons"},
{501, "Not Implemented"},
{502, "Bad Gateway"},
{503, "Service Unavailable"},
{504, "Gateway Timeout"},
{505, "HTTP Version Not Supported"},
{506, "Variant Also Negotiates"},
{507, "Insufficient Storage"},
{508, "Loop Detected"},
{510, "Not Extended"},
{511, "Network Authentication Required"}};
std::unordered_map<std::string, std::string> _mime_msg = {
{".aac", "audio/aac"},
{".abw", "application/x-abiword"},
{".arc", "application/x-freearc"},
{".avi", "video/x-msvideo"},
{".azw", "application/vnd.amazon.ebook"},
{".bin", "application/octet-stream"},
{".bmp", "image/bmp"},
{".bz", "application/x-bzip"},
{".bz2", "application/x-bzip2"},
{".csh", "application/x-csh"},
{".css", "text/css"},
{".csv", "text/csv"},
{".doc", "application/msword"},
{".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"},
{".eot", "application/vnd.ms-fontobject"},
{".epub", "application/epub+zip"},
{".gif", "image/gif"},
{".htm", "text/html"},
{".html", "text/html"},
{".ico", "image/vnd.microsoft.icon"},
{".ics", "text/calendar"},
{".jar", "application/java-archive"},
{".jpeg", "image/jpeg"},
{".jpg", "image/jpeg"},
{".js", "text/javascript"},
{".json", "application/json"},
{".jsonld", "application/ld+json"},
{".mid", "audio/midi"},
{".midi", "audio/x-midi"},
{".mjs", "text/javascript"},
{".mp3", "audio/mpeg"},
{".mpeg", "video/mpeg"},
{".mpkg", "application/vnd.apple.installer+xml"},
{".odp", "application/vnd.oasis.opendocument.presentation"},
{".ods", "application/vnd.oasis.opendocument.spreadsheet"},
{".odt", "application/vnd.oasis.opendocument.text"},
{".oga", "audio/ogg"},
{".ogv", "video/ogg"},
{".ogx", "application/ogg"},
{".otf", "font/otf"},
{".png", "image/png"},
{".pdf", "application/pdf"},
{".ppt", "application/vnd.ms-powerpoint"},
{".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"},
{".rar", "application/x-rar-compressed"},
{".rtf", "application/rtf"},
{".sh", "application/x-sh"},
{".svg", "image/svg+xml"},
{".swf", "application/x-shockwave-flash"},
{".tar", "application/x-tar"},
{".tif", "image/tiff"},
{".tiff", "image/tiff"},
{".ttf", "font/ttf"},
{".txt", "text/plain"},
{".vsd", "application/vnd.visio"},
{".wav", "audio/wav"},
{".weba", "audio/webm"},
{".webm", "video/webm"},
{".webp", "image/webp"},
{".woff", "font/woff"},
{".woff2", "font/woff2"},
{".xhtml", "application/xhtml+xml"},
{".xls", "application/vnd.ms-excel"},
{".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"},
{".xml", "application/xml"},
{".xul", "application/vnd.mozilla.xul+xml"},
{".zip", "application/zip"},
{".3gp", "video/3gpp"},
{".3g2", "video/3gpp2"},
{".7z", "application/x-7z-compressed"}};
class Util
{
public:
// 字符串分割函數(shù),將src字符串按照sep字符進行分割,得到的各個字串放到arry中,最終返回字串的數(shù)量
static size_t Split(const std::string &src, const std::string &sep, std::vector<std::string> *arry)
{
size_t offset = 0;
// 有10個字符,offset是查找的起始位置,范圍應該是0~9,offset==10就代表已經(jīng)越界了
while (offset < src.size())
{
size_t pos = src.find(sep, offset); // 在src字符串偏移量offset處,開始向后查找sep字符/字串,返回查找到的位置
if (pos == std::string::npos)
{ // 沒有找到特定的字符
// 將剩余的部分當作一個字串,放入arry中
if (pos == src.size())
break;
arry->push_back(src.substr(offset));
return arry->size();
}
if (pos == offset)
{
offset = pos + sep.size();
continue; // 當前字串是一個空的,沒有內(nèi)容
}
arry->push_back(src.substr(offset, pos - offset));
offset = pos + sep.size();
}
return arry->size();
}
// 讀取文件的所有內(nèi)容,將讀取的內(nèi)容放到一個Buffer中
static bool ReadFile(const std::string &filename, std::string *buf)
{
std::ifstream ifs(filename, std::ios::binary);
if (ifs.is_open() == false)
{
printf("OPEN %s FILE FAILED!!", filename.c_str());
return false;
}
size_t fsize = 0;
ifs.seekg(0, ifs.end); // 跳轉讀寫位置到末尾
fsize = ifs.tellg(); // 獲取當前讀寫位置相對于起始位置的偏移量,從末尾偏移剛好就是文件大小
ifs.seekg(0, ifs.beg); // 跳轉到起始位置
buf->resize(fsize); // 開辟文件大小的空間
ifs.read(&(*buf)[0], fsize);
if (ifs.good() == false)
{
printf("READ %s FILE FAILED!!", filename.c_str());
ifs.close();
return false;
}
ifs.close();
return true;
}
// 向文件寫入數(shù)據(jù)
static bool WriteFile(const std::string &filename, const std::string &buf)
{
std::ofstream ofs(filename, std::ios::binary | std::ios::trunc);
if (ofs.is_open() == false)
{
printf("OPEN %s FILE FAILED!!", filename.c_str());
return false;
}
ofs.write(buf.c_str(), buf.size());
if (ofs.good() == false)
{
ERR_LOG("WRITE %s FILE FAILED!", filename.c_str());
ofs.close();
return false;
}
ofs.close();
return true;
}
// URL編碼,避免URL中資源路徑與查詢字符串中的特殊字符與HTTP請求中特殊字符產(chǎn)生歧義
// 編碼格式:將特殊字符的ascii值,轉換為兩個16進制字符,前綴% C++ -> C%2B%2B
// 不編碼的特殊字符: RFC3986文檔規(guī)定 . - _ ~ 字母,數(shù)字屬于絕對不編碼字符
// RFC3986文檔規(guī)定,編碼格式 %HH
// W3C標準中規(guī)定,查詢字符串中的空格,需要編碼為+, 解碼則是+轉空格
static std::string UrlEncode(const std::string url, bool convert_space_to_plus)
{
std::string res;
for (auto &c : url)
{
if (c == '.' || c == '-' || c == '_' || c == '~' || isalnum(c))
{
res += c;
continue;
}
if (c == ' ' && convert_space_to_plus == true)
{
res += '+';
continue;
}
// 剩下的字符都是需要編碼成為 %HH 格式
char tmp[4] = {0};
// snprintf 與 printf比較類似,都是格式化字符串,只不過一個是打印,一個是放到一塊空間中
snprintf(tmp, 4, "%%%02X", c);
res += tmp;
}
return res;
}
static char HEXTOI(char c)
{
if (c >= '0' && c <= '9')
{
return c - '0';
}
else if (c >= 'a' && c <= 'z')
{
return c - 'a' + 10;
}
else if (c >= 'A' && c <= 'Z')
{
return c - 'A' + 10;
}
return -1;
}
static std::string UrlDecode(const std::string url, bool convert_plus_to_space)
{
// 遇到了%,則將緊隨其后的2個字符,轉換為數(shù)字,第一個數(shù)字左移4位,然后加上第二個數(shù)字 + -> 2b %2b->2 << 4 + 11
std::string res;
for (int i = 0; i < url.size(); i++)
{
if (url[i] == '+' && convert_plus_to_space == true)
{
res += ' ';
continue;
}
if (url[i] == '%' && (i + 2) < url.size())
{
char v1 = HEXTOI(url[i + 1]);
char v2 = HEXTOI(url[i + 2]);
char v = v1 * 16 + v2;
res += v;
i += 2;
continue;
}
res += url[i];
}
return res;
}
// 響應狀態(tài)碼的描述信息獲取
static std::string StatuDesc(int statu)
{
auto it = _statu_msg.find(statu);
if (it != _statu_msg.end())
{
return it->second;
}
return "Unknow";
}
// 根據(jù)文件后綴名獲取文件mime
static std::string ExtMime(const std::string &filename)
{
// a.b.txt 先獲取文件擴展名
size_t pos = filename.find_last_of('.');
if (pos == std::string::npos)
{
return "application/octet-stream";
}
// 根據(jù)擴展名,獲取mime
std::string ext = filename.substr(pos);
auto it = _mime_msg.find(ext);
if (it == _mime_msg.end())
{
return "application/octet-stream";
}
return it->second;
}
// 判斷一個文件是否是一個目錄
static bool IsDirectory(const std::string &filename)
{
struct stat st;
int ret = stat(filename.c_str(), &st);
if (ret < 0)
{
return false;
}
return S_ISDIR(st.st_mode);
}
// 判斷一個文件是否是一個普通文件
static bool IsRegular(const std::string &filename)
{
struct stat st;
int ret = stat(filename.c_str(), &st);
if (ret < 0)
{
return false;
}
return S_ISREG(st.st_mode);
}
// http請求的資源路徑有效性判斷
// /index.html --- 前邊的/叫做相對根目錄 映射的是某個服務器上的子目錄
// 想表達的意思就是,客戶端只能請求相對根目錄中的資源,其他地方的資源都不予理會
// /../login, 這個路徑中的..會讓路徑的查找跑到相對根目錄之外,這是不合理的,不安全的
static bool ValidPath(const std::string &path)
{
// 思想:按照/進行路徑分割,根據(jù)有多少子目錄,計算目錄深度,有多少層,深度不能小于0
std::vector<std::string> subdir;
Split(path, "/", &subdir);
int level = 0;
for (auto &dir : subdir)
{
if (dir == "..")
{
level--; // 任意一層走出相對根目錄,就認為有問題
if (level < 0)
return false;
continue;
}
level++;
}
return true;
}
};
20.2HttpRequest請求類實現(xiàn)
class HttpRequest
{
public:
std::string _method; // 請求方法
std::string _path; // 資源路徑
std::string _version; // 協(xié)議版本
std::string _body; // 請求正文
std::smatch _matches; // 資源路徑的正則提取數(shù)據(jù)
std::unordered_map<std::string, std::string> _headers; // 頭部字段
std::unordered_map<std::string, std::string> _params; // 查詢字符串
public:
HttpRequest() : _version("HTTP/1.1") {}
void ReSet()
{
_method.clear();
_path.clear();
_version = "HTTP/1.1";
_body.clear();
std::smatch match;
_matches.swap(match);
_headers.clear();
_params.clear();
}
// 插入頭部字段
void SetHeader(const std::string &key, const std::string &val)
{
_headers.insert(std::make_pair(key, val));
}
// 判斷是否存在指定頭部字段
bool HasHeader(const std::string &key) const
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return false;
}
return true;
}
// 獲取指定頭部字段的值
std::string GetHeader(const std::string &key) const
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return "";
}
return it->second;
}
// 插入查詢字符串
void SetParam(const std::string &key, const std::string &val)
{
_params.insert(std::make_pair(key, val));
}
// 判斷是否有某個指定的查詢字符串
bool HasParam(const std::string &key) const
{
auto it = _params.find(key);
if (it == _params.end())
{
return false;
}
return true;
}
// 獲取指定的查詢字符串
std::string GetParam(const std::string &key) const
{
auto it = _params.find(key);
if (it == _params.end())
{
return "";
}
return it->second;
}
// 獲取正文長度
size_t ContentLength() const
{
// Content-Length: 1234\r\n
bool ret = HasHeader("Content-Length");
if (ret == false)
{
return 0;
}
std::string clen = GetHeader("Content-Length");
return std::stol(clen);
}
// 判斷是否是短鏈接
bool Close() const
{
// 沒有Connection字段,或者有Connection但是值是close,則都是短鏈接,否則就是長連接
if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive")
{
return false;
}
return true;
}
};
20.3HttpResponse響應類實現(xiàn)
class HttpResponse
{
public:
int _statu;
bool _redirect_flag;
std::string _body;
std::string _redirect_url;
std::unordered_map<std::string, std::string> _headers;
public:
HttpResponse() : _redirect_flag(false), _statu(200) {}
HttpResponse(int statu) : _redirect_flag(false), _statu(statu) {}
void ReSet()
{
_statu = 200;
_redirect_flag = false;
_body.clear();
_redirect_url.clear();
_headers.clear();
}
// 插入頭部字段
void SetHeader(const std::string &key, const std::string &val)
{
_headers.insert(std::make_pair(key, val));
}
// 判斷是否存在指定頭部字段
bool HasHeader(const std::string &key)
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return false;
}
return true;
}
// 獲取指定頭部字段的值
std::string GetHeader(const std::string &key)
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return "";
}
return it->second;
}
void SetContent(const std::string &body, const std::string &type = "text/html")
{
_body = body;
SetHeader("Content-Type", type);
}
void SetRedirect(const std::string &url, int statu = 302)
{
_statu = statu;
_redirect_flag = true;
_redirect_url = url;
}
// 判斷是否是短鏈接
bool Close()
{
// 沒有Connection字段,或者有Connection但是值是close,則都是短鏈接,否則就是長連接
if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive")
{
return false;
}
return true;
}
};
20.4HttpContext上下文類實現(xiàn)
typedef enum
{
RECV_HTTP_ERROR,
RECV_HTTP_LINE,
RECV_HTTP_HEAD,
RECV_HTTP_BODY,
RECV_HTTP_OVER
} HttpRecvStatu;
#define MAX_LINE 8192
class HttpContext
{
private:
int _resp_statu; // 響應狀態(tài)碼
HttpRecvStatu _recv_statu; // 當前接收及解析的階段狀態(tài)
HttpRequest _request; // 已經(jīng)解析得到的請求信息
private:
bool ParseHttpLine(const std::string &line)
{
std::smatch matches;
std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?", std::regex::icase);
bool ret = std::regex_match(line, matches, e);
if (ret == false)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400; // BAD REQUEST
return false;
}
// 0 : GET /bitejiuyeke/login?user=xiaoming&pass=123123 HTTP/1.1
// 1 : GET
// 2 : /bitejiuyeke/login
// 3 : user=xiaoming&pass=123123
// 4 : HTTP/1.1
// 請求方法的獲取
_request._method = matches[1];
std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper);
// 資源路徑的獲取,需要進行URL解碼操作,但是不需要+轉空格
_request._path = Util::UrlDecode(matches[2], false);
// 協(xié)議版本的獲取
_request._version = matches[4];
// 查詢字符串的獲取與處理
std::vector<std::string> query_string_arry;
std::string query_string = matches[3];
// 查詢字符串的格式 key=val&key=val....., 先以 & 符號進行分割,得到各個字串
Util::Split(query_string, "&", &query_string_arry);
// 針對各個字串,以 = 符號進行分割,得到key 和val, 得到之后也需要進行URL解碼
for (auto &str : query_string_arry)
{
size_t pos = str.find("=");
if (pos == std::string::npos)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400; // BAD REQUEST
return false;
}
std::string key = Util::UrlDecode(str.substr(0, pos), true);
std::string val = Util::UrlDecode(str.substr(pos + 1), true);
_request.SetParam(key, val);
}
return true;
}
bool RecvHttpLine(Buffer *buf)
{
if (_recv_statu != RECV_HTTP_LINE)
return false;
// 1. 獲取一行數(shù)據(jù),帶有末尾的換行
std::string line = buf->GetLineAndPop();
// 2. 需要考慮的一些要素:緩沖區(qū)中的數(shù)據(jù)不足一行, 獲取的一行數(shù)據(jù)超大
if (line.size() == 0)
{
// 緩沖區(qū)中的數(shù)據(jù)不足一行,則需要判斷緩沖區(qū)的可讀數(shù)據(jù)長度,如果很長了都不足一行,這是有問題的
if (buf->ReadAbleSize() > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; // URI TOO LONG
return false;
}
// 緩沖區(qū)中數(shù)據(jù)不足一行,但是也不多,就等等新數(shù)據(jù)的到來
return true;
}
if (line.size() > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; // URI TOO LONG
return false;
}
bool ret = ParseHttpLine(line);
if (ret == false)
{
return false;
}
// 首行處理完畢,進入頭部獲取階段
_recv_statu = RECV_HTTP_HEAD;
return true;
}
bool RecvHttpHead(Buffer *buf)
{
if (_recv_statu != RECV_HTTP_HEAD)
return false;
// 一行一行取出數(shù)據(jù),直到遇到空行為止, 頭部的格式 key: val\r\nkey: val\r\n....
while (1)
{
std::string line = buf->GetLineAndPop();
// 2. 需要考慮的一些要素:緩沖區(qū)中的數(shù)據(jù)不足一行, 獲取的一行數(shù)據(jù)超大
if (line.size() == 0)
{
// 緩沖區(qū)中的數(shù)據(jù)不足一行,則需要判斷緩沖區(qū)的可讀數(shù)據(jù)長度,如果很長了都不足一行,這是有問題的
if (buf->ReadAbleSize() > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; // URI TOO LONG
return false;
}
// 緩沖區(qū)中數(shù)據(jù)不足一行,但是也不多,就等等新數(shù)據(jù)的到來
return true;
}
if (line.size() > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; // URI TOO LONG
return false;
}
if (line == "\n" || line == "\r\n")
{
break;
}
bool ret = ParseHttpHead(line);
if (ret == false)
{
return false;
}
}
// 頭部處理完畢,進入正文獲取階段
_recv_statu = RECV_HTTP_BODY;
return true;
}
bool ParseHttpHead(std::string &line)
{
// key: val\r\nkey: val\r\n....
if (line.back() == '\n')
line.pop_back(); // 末尾是換行則去掉換行字符
if (line.back() == '\r')
line.pop_back(); // 末尾是回車則去掉回車字符
size_t pos = line.find(": ");
if (pos == std::string::npos)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400; //
return false;
}
std::string key = line.substr(0, pos);
std::string val = line.substr(pos + 2);
_request.SetHeader(key, val);
return true;
}
bool RecvHttpBody(Buffer *buf)
{
if (_recv_statu != RECV_HTTP_BODY)
return false;
// 1. 獲取正文長度
size_t content_length = _request.ContentLength();
if (content_length == 0)
{
// 沒有正文,則請求接收解析完畢
_recv_statu = RECV_HTTP_OVER;
return true;
}
// 2. 當前已經(jīng)接收了多少正文,其實就是往 _request._body 中放了多少數(shù)據(jù)了
size_t real_len = content_length - _request._body.size(); // 實際還需要接收的正文長度
// 3. 接收正文放到body中,但是也要考慮當前緩沖區(qū)中的數(shù)據(jù),是否是全部的正文
// 3.1 緩沖區(qū)中數(shù)據(jù),包含了當前請求的所有正文,則取出所需的數(shù)據(jù)
if (buf->ReadAbleSize() >= real_len)
{
_request._body.append(buf->ReadPosition(), real_len);
buf->MoveReadOffset(real_len);
_recv_statu = RECV_HTTP_OVER;
return true;
}
// 3.2 緩沖區(qū)中數(shù)據(jù),無法滿足當前正文的需要,數(shù)據(jù)不足,取出數(shù)據(jù),然后等待新數(shù)據(jù)到來
_request._body.append(buf->ReadPosition(), buf->ReadAbleSize());
buf->MoveReadOffset(buf->ReadAbleSize());
return true;
}
public:
HttpContext() : _resp_statu(200), _recv_statu(RECV_HTTP_LINE) {}
void ReSet()
{
_resp_statu = 200;
_recv_statu = RECV_HTTP_LINE;
_request.ReSet();
}
int RespStatu() { return _resp_statu; }
HttpRecvStatu RecvStatu() { return _recv_statu; }
HttpRequest &Request() { return _request; }
// 接收并解析HTTP請求
void RecvHttpRequest(Buffer *buf)
{
// 不同的狀態(tài),做不同的事情,但是這里不要break, 因為處理完請求行后,應該立即處理頭部,而不是退出等新數(shù)據(jù)
switch (_recv_statu)
{
case RECV_HTTP_LINE:
RecvHttpLine(buf);
case RECV_HTTP_HEAD:
RecvHttpHead(buf);
case RECV_HTTP_BODY:
RecvHttpBody(buf);
}
return;
}
};
20.5HttpServer類實現(xiàn)
class HttpServer
{
private:
using Handler = std::function<void(const HttpRequest &, HttpResponse *)>;
using Handlers = std::vector<std::pair<std::regex, Handler>>;
Handlers _get_route;
Handlers _post_route;
Handlers _put_route;
Handlers _delete_route;
std::string _basedir; // 靜態(tài)資源根目錄
TcpServer _server;
private:
void ErrorHandler(const HttpRequest &req, HttpResponse *rsp)
{
// 1. 組織一個錯誤展示頁面
std::string body;
body += "<html>";
body += "<head>";
body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>";
body += "</head>";
body += "<body>";
body += "<h1>";
body += std::to_string(rsp->_statu);
body += " ";
body += Util::StatuDesc(rsp->_statu);
body += "</h1>";
body += "</body>";
body += "</html>";
// 2. 將頁面數(shù)據(jù),當作響應正文,放入rsp中
rsp->SetContent(body, "text/html");
}
// 將HttpResponse中的要素按照http協(xié)議格式進行組織,發(fā)送
void WriteReponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp)
{
// 1. 先完善頭部字段
if (req.Close() == true)
{
rsp.SetHeader("Connection", "close");
}
else
{
rsp.SetHeader("Connection", "keep-alive");
}
if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false)
{
rsp.SetHeader("Content-Length", std::to_string(rsp._body.size()));
}
if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false)
{
rsp.SetHeader("Content-Type", "application/octet-stream");
}
if (rsp._redirect_flag == true)
{
rsp.SetHeader("Location", rsp._redirect_url);
}
// 2. 將rsp中的要素,按照http協(xié)議格式進行組織
std::stringstream rsp_str;
rsp_str << req._version << " " << std::to_string(rsp._statu) << " " << Util::StatuDesc(rsp._statu) << "\r\n";
for (auto &head : rsp._headers)
{
rsp_str << head.first << ": " << head.second << "\r\n";
}
rsp_str << "\r\n";
rsp_str << rsp._body;
// 3. 發(fā)送數(shù)據(jù)
conn->Send(rsp_str.str().c_str(), rsp_str.str().size());
}
bool IsFileHandler(const HttpRequest &req)
{
// 1. 必須設置了靜態(tài)資源根目錄
if (_basedir.empty())
{
return false;
}
// 2. 請求方法,必須是GET / HEAD請求方法
if (req._method != "GET" && req._method != "HEAD")
{
return false;
}
// 3. 請求的資源路徑必須是一個合法路徑
if (Util::ValidPath(req._path) == false)
{
return false;
}
// 4. 請求的資源必須存在,且是一個普通文件
// 有一種請求比較特殊 -- 目錄:/, /image/, 這種情況給后邊默認追加一個 index.html
// index.html /image/a.png
// 不要忘了前綴的相對根目錄,也就是將請求路徑轉換為實際存在的路徑 /image/a.png -> ./wwwroot/image/a.png
std::string req_path = _basedir + req._path; // 為了避免直接修改請求的資源路徑,因此定義一個臨時對象
if (req._path.back() == '/')
{
req_path += "index.html";
}
if (Util::IsRegular(req_path) == false)
{
return false;
}
return true;
}
// 靜態(tài)資源的請求處理 --- 將靜態(tài)資源文件的數(shù)據(jù)讀取出來,放到rsp的_body中, 并設置mime
void FileHandler(const HttpRequest &req, HttpResponse *rsp)
{
std::string req_path = _basedir + req._path;
if (req._path.back() == '/')
{
req_path += "index.html";
}
bool ret = Util::ReadFile(req_path, &rsp->_body);
if (ret == false)
{
return;
}
std::string mime = Util::ExtMime(req_path);
rsp->SetHeader("Content-Type", mime);
return;
}
// 功能性請求的分類處理
void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers)
{
// 在對應請求方法的路由表中,查找是否含有對應資源請求的處理函數(shù),有則調(diào)用,沒有則發(fā)揮404
// 思想:路由表存儲的時鍵值對 -- 正則表達式 & 處理函數(shù)
// 使用正則表達式,對請求的資源路徑進行正則匹配,匹配成功就使用對應函數(shù)進行處理
// /numbers/(\d+) /numbers/12345
for (auto &handler : handlers)
{
const std::regex &re = handler.first;
const Handler &functor = handler.second;
bool ret = std::regex_match(req._path, req._matches, re);
if (ret == false)
{
continue;
}
return functor(req, rsp); // 傳入請求信息,和空的rsp,執(zhí)行處理函數(shù)
}
rsp->_statu = 404;
}
void Route(HttpRequest &req, HttpResponse *rsp)
{
// 1. 對請求進行分辨,是一個靜態(tài)資源請求,還是一個功能性請求
// 靜態(tài)資源請求,則進行靜態(tài)資源的處理
// 功能性請求,則需要通過幾個請求路由表來確定是否有處理函數(shù)
// 既不是靜態(tài)資源請求,也沒有設置對應的功能性請求處理函數(shù),就返回405
if (IsFileHandler(req) == true)
{
// 是一個靜態(tài)資源請求, 則進行靜態(tài)資源請求的處理
return FileHandler(req, rsp);
}
if (req._method == "GET" || req._method == "HEAD")
{
return Dispatcher(req, rsp, _get_route);
}
else if (req._method == "POST")
{
return Dispatcher(req, rsp, _post_route);
}
else if (req._method == "PUT")
{
return Dispatcher(req, rsp, _put_route);
}
else if (req._method == "DELETE")
{
return Dispatcher(req, rsp, _delete_route);
}
rsp->_statu = 405; // Method Not Allowed
return;
}
// 設置上下文
void OnConnected(const PtrConnection &conn)
{
conn->SetContext(HttpContext());
DBG_LOG("NEW CONNECTION %p", conn.get());
}
// 緩沖區(qū)數(shù)據(jù)解析+處理
void OnMessage(const PtrConnection &conn, Buffer *buffer)
{
while (buffer->ReadAbleSize() > 0)
{
// 1. 獲取上下文
HttpContext *context = conn->GetContext()->get<HttpContext>();
// 2. 通過上下文對緩沖區(qū)數(shù)據(jù)進行解析,得到HttpRequest對象
// 1. 如果緩沖區(qū)的數(shù)據(jù)解析出錯,就直接回復出錯響應
// 2. 如果解析正常,且請求已經(jīng)獲取完畢,才開始去進行處理
context->RecvHttpRequest(buffer);
HttpRequest &req = context->Request();
HttpResponse rsp(context->RespStatu());
if (context->RespStatu() >= 400)
{
// 進行錯誤響應,關閉連接
ErrorHandler(req, &rsp); // 填充一個錯誤顯示頁面數(shù)據(jù)到rsp中
WriteReponse(conn, req, rsp); // 組織響應發(fā)送給客戶端
context->ReSet();
buffer->MoveReadOffset(buffer->ReadAbleSize()); // 出錯了就把緩沖區(qū)數(shù)據(jù)清空
conn->Shutdown(); // 關閉連接
return;
}
if (context->RecvStatu() != RECV_HTTP_OVER)
{
// 當前請求還沒有接收完整,則退出,等新數(shù)據(jù)到來再重新繼續(xù)處理
return;
}
// 3. 請求路由 + 業(yè)務處理
Route(req, &rsp);
// 4. 對HttpResponse進行組織發(fā)送
WriteReponse(conn, req, rsp);
// 5. 重置上下文
context->ReSet();
// 6. 根據(jù)長短連接判斷是否關閉連接或者繼續(xù)處理
if (rsp.Close() == true)
conn->Shutdown(); // 短鏈接則直接關閉
}
return;
}
public:
HttpServer(int port, int timeout = DEFALT_TIMEOUT) : _server(port)
{
_server.EnableInactiveRelease(timeout);
_server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1));
_server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
}
void SetBaseDir(const std::string &path)
{
assert(Util::IsDirectory(path) == true);
_basedir = path;
}
/*設置/添加,請求(請求的正則表達)與處理函數(shù)的映射關系*/
void Get(const std::string &pattern, const Handler &handler)
{
_get_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void Post(const std::string &pattern, const Handler &handler)
{
_post_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void Put(const std::string &pattern, const Handler &handler)
{
_put_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void Delete(const std::string &pattern, const Handler &handler)
{
_delete_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void SetThreadCount(int count)
{
_server.SetThreadCount(count);
}
void Listen()
{
_server.Start();
}
};
20.6基于HttpServer搭建HTTP服務器
#include "http.hpp"
#define WWWROOT "./wwwroot/"
std::string RequestStr(const HttpRequest &req) {
std::stringstream ss;
ss << req._method << " " << req._path << " " << req._version << "\r\n";
for (auto &it : req._params) {
ss << it.first << ": " << it.second << "\r\n";
}
for (auto &it : req._headers) {
ss << it.first << ": " << it.second << "\r\n";
}
ss << "\r\n";
ss << req._body;
return ss.str();
}
void Hello(const HttpRequest &req, HttpResponse *rsp)
{
rsp->SetContent(RequestStr(req), "text/plain");
}
void Login(const HttpRequest &req, HttpResponse *rsp)
{
rsp->SetContent(RequestStr(req), "text/plain");
}
void PutFile(const HttpRequest &req, HttpResponse *rsp)
{
std::string pathname = WWWROOT + req._path;
Util::WriteFile(pathname, req._body);
}
void DelFile(const HttpRequest &req, HttpResponse *rsp)
{
rsp->SetContent(RequestStr(req), "text/plain");
}
int main()
{
HttpServer server(8081);
server.SetThreadCount(3);
server.SetBaseDir(WWWROOT);//設置靜態(tài)資源根目錄,告訴服務器有靜態(tài)資源請求到來,需要到哪里去找資源文件
server.Get("/hello", Hello);
server.Post("/login", Login);
server.Put("/1234.txt", PutFile);
server.Delete("/1234.txt", DelFile);
server.Listen();
return 0;
}
21.功能測試
21.1使用Postman進行基本功能測試
21.2長連接連續(xù)請求測試
?個連接中每隔3s向服務器發(fā)送?個請求,查看是否會收到響應。
預期結果:可以正常進行長連接的通信。
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8085, "10.0.24.11");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(1) {
assert(cli_sock.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(cli_sock.Recv(buf, 1023));
DEBUG_LOG("[%s]", buf);
sleep(3);
}
cli_sock.Close();
return 0;
}
客戶端每三秒發(fā)送一次數(shù)據(jù),刷新活躍度。長連接測試正常。
21.3超時連接測試
創(chuàng)建一個客戶端,給服務器發(fā)送一次數(shù)據(jù)后 不動了,查看服務器是否會正常的超時關閉連接。
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8085, "10.0.24.11");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(1) {
assert(cli_sock.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(cli_sock.Recv(buf, 1023));
DEBUG_LOG("[%s]", buf);
sleep(15);
}
cli_sock.Close();
return 0;
}
客戶端發(fā)送一次數(shù)據(jù)后,超時時間內(nèi)再無動作。非活躍連接正常超時關閉,測試正常。
21.4不完整請求測試
給服務器發(fā)送一個數(shù)據(jù),告訴服務器要發(fā)送1024字節(jié)的數(shù)據(jù),但是實際發(fā)送的數(shù)據(jù)不足1024,查看服務器處理結果。
//不完整請求測試
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8085, "10.0.24.11");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 100\r\n\r\nbitejiuyeke";
while(1) {
assert(cli_sock.Send(req.c_str(), req.size()) != -1);
//assert(cli_sock.Send(req.c_str(), req.size()) != -1);
//assert(cli_sock.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(cli_sock.Recv(buf,1023));
DEBUG_LOG("[%s]", buf);
sleep(3);
}
cli_sock.Close();
return 0;
}
1. 如果數(shù)據(jù)只發(fā)送一次,服務器將得不到完整請求,就不會進行業(yè)務處理,客戶端也就得不到響應,最終超時關閉連接。
?2. 連著給服務器發(fā)送了多次 小的請求, 服務器會將后邊的請求當作前邊請求的正文進行處理,而后邊處理的時候有可能就會因為處理錯誤而關閉連接。
21.5業(yè)務處理超時測試
業(yè)務處理超時,查看服務器的處理情況
當服務器達到了一個性能瓶頸,在一次業(yè)務處理中花費了太長的時間(超過了服務器設置的非活躍超時時間)
1. 在一次業(yè)務處理中耗費太長時間,導致其他的連接也被連累超時,其他的連接有可能會被拖累超時釋放。假設現(xiàn)在 12345描述符就緒了, 在處理1的時候花費了30s處理完,超時了,導致2345描述符因為長時間沒有刷新活躍度
1. 如果接下來的2345描述符都是通信連接描述符,如果都就緒了,則并不影響,因為接下來就會進行處理并刷新活躍度
2. 如果接下來的2號描述符是定時器事件描述符 定時器觸發(fā)超時,執(zhí)行定時任務,就會將345描述符給釋放掉
2.1 這時一旦345描述符對應的連接被釋放,接下來在處理345事件的時候就會導致程序崩潰(內(nèi)存訪問錯誤)
2.2 因此這時候,在本次事件處理中,并不能直接對連接進行釋放,而應該將釋放操作壓入到任務池中,等到事件處理完了執(zhí)行任務池中的任務的時候,再去釋放。
int main()
{
signal(SIGCHLD, SIG_IGN);
for (int i = 0; i < 10; i++) {
pid_t pid = fork();
if (pid < 0) {
DEBUG_LOG("FORK ERROR");
return -1;
}else if (pid == 0) {
Socket cli_sock;
cli_sock.CreateClient(8085, "10.0.24.11");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(1) {
assert(cli_sock.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(cli_sock.Recv(buf, 1023));
DEBUG_LOG("[%s]", buf);
}
cli_sock.Close();
exit(0);
}
}
while(1) sleep(1);
return 0;
}
21.6一次發(fā)送多條數(shù)據(jù)測試
一次性給服務器發(fā)送多條數(shù)據(jù),然后查看服務器的處理結果。
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8085, "10.0.24.11");
std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
while(1) {
assert(cli_sock.Send(req.c_str(), req.size()) != -1);
char buf[1024] = {0};
assert(cli_sock.Recv(buf, 1023));
DEBUG_LOG("[%s]", buf);
sleep(3);
}
cli_sock.Close();
return 0;
}
每一條請求都應該得到正常處理。
21.7大文件傳輸測試
大文件傳輸測試,給服務器上傳一個大文件,服務器將文件保存下來,觀察處理結果。上傳的文件,和服務器保存的文件一致。
準備好一個測試文件,資源有限,創(chuàng)建一個100MB大小的log.txt。
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8085, "10.0.24.11");
std::string req = "PUT /1234.txt HTTP/1.1\r\nConnection: keep-alive\r\n";
std::string body;
Util::ReadFile("./log.txt", body);
req += "Content-Length: " + std::to_string(body.size()) + "\r\n\r\n";
assert(cli_sock.Send(req.c_str(), req.size()) != -1);
assert(cli_sock.Send(body.c_str(), body.size()) != -1);
char buf[1024] = {0};
assert(cli_sock.Recv(buf, 1023));
DEBUG_LOG("[%s]", buf);
sleep(3);
cli_sock.Close();
return 0;
}
文件上傳成功:
收到的文件:
??對比兩個文件內(nèi)容是否相同:
根據(jù)測試,文件傳輸也沒有問題。
21.8抗壓力測試
通過測試工具模擬大量客戶端向服務器發(fā)送連接請求。
模擬20000個客戶端同時向服務器發(fā)送請求,沒有出現(xiàn)連接失敗。
測試結論(參考)
性能測試環(huán)境:
服務端:2核2G帶寬為1M的云服務器。
客戶端:4核8G的虛擬機通過webbench工具模擬客戶端,創(chuàng)建大量線程連接服務器,發(fā)送請求,在收到響應后關閉連接,開始下一個連接的建立。
測試結論:
服務器并發(fā)量:可以同時處理20000-30000個客戶端的請求而不會出現(xiàn)連接失敗。文章來源:http://www.zghlxwxcb.cn/news/detail-712840.html
QPS:(Query Per Second)每秒查詢率107左右。文章來源地址http://www.zghlxwxcb.cn/news/detail-712840.html
到了這里,關于C++項目:仿mudou庫one thread one loop式并發(fā)服務器實現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!