1.IO
1.1基本介紹
IO實際上就是 input && output
在馮諾依曼體系中就是與外設(shè)交互的意思,而我們的網(wǎng)絡通信本質(zhì)上也是一種IO。
1.2基礎(chǔ)io的低效性
為什么基礎(chǔ)io會低效呢?我們以讀取為例來介紹。
當我們底層調(diào)用read函數(shù)的時候,如果緩沖區(qū)沒有數(shù)據(jù) ,我們就會將pcb放入等待隊列,進行阻塞。
當我們底層調(diào)用read函數(shù)的時候,如果緩沖區(qū)有數(shù)據(jù),我們就會讀取。
這么來看我們io的本質(zhì)其實就是數(shù)據(jù)拷貝+等待。
實際上回顧我們之前的文章,不光是在網(wǎng)絡中在本地主機進行IO的時候也是進行這兩個階段。
當我們的程序需要讀取磁盤中的內(nèi)容時,磁盤需要先將內(nèi)容加載到內(nèi)存里面。
而在加載還未完成時我們的程序在做什么呢? 阻塞 或者說 等。
這也就是為什么我們使用scanf
和cin
等輸入函數(shù)的時候 命令行會阻塞住。
在進行IO的時間里大部分時間都在等待。
1.3如何提高IO效率
順著我們上面說的實際就是降低這個等待時間的比重。
1.4五種IO模型
經(jīng)歷了這么長時間的發(fā)展之后,計算機的前輩們已經(jīng)總結(jié)出來了五種IO模型,讓我們一起來學習一下吧。
如果光將一些概念大家應該很難理解,這里我們借用釣魚的例子來為大家大致分析一下(把釣起魚的一瞬間抽象成拷貝,等魚兒上鉤的時間想象成阻塞時間)。
例子一:張三去釣魚的時候不喜歡被打擾,甩鉤之后就一直盯著魚漂,等什么時候余漂有反應了就立刻拉鉤。
例子二:李四去釣魚的時候?qū)P牟涣耍︺^之后就喜歡刷刷手機,每刷一會兒手機就看一眼魚漂,如果有反應了就拉鉤,如果沒反應就繼續(xù)刷手機。
例子三:王五去釣魚的時候喜歡在魚漂上掛個鈴鐺 ,之后就去刷手機玩了。 如果鈴鐺響了,那么王五就去拉鉤;如果沒響,就一直玩手。
例子四:趙六去釣魚的時候喜歡多備幾根魚竿,所有魚竿下水之后趙六就在旁邊巡視,哪一根魚竿的魚漂動了就去拉哪根魚竿。
例子五:田七去釣魚的時候帶著一個小跟班,每次只需要布置任務讓小跟班釣多少魚就好,自己處理自己的事情去了。
上面的五個例子分別代表了五個IO模型分別是:
-
故事一: 阻塞
-
故事二: 非阻塞輪詢
-
故事三: 信號驅(qū)動
-
故事四: 多路復用多路轉(zhuǎn)接
-
故事五: 異步IO
理論上來講,這些例子當中例子四就是我們的多路復用多路轉(zhuǎn)接最為高效,當然這里提到異步IO我們就多提一嘴,其實有關(guān)異步IO和同步IO的概念一直都有爭論,有興趣的同學可以去了解一下,這里就不做過多講解。
1.5非阻塞模式的設(shè)置
如果想讓IO進行非阻塞的話 打開文件的時候就可以進行非阻塞設(shè)置,比如說 open
socket
。
但是如果我們使用每個函數(shù)的時候都記住它們的非阻塞標志未免也有點太麻煩了,
所以說我們這里使用 fcntl
函數(shù)來統(tǒng)一設(shè)置:
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */);
fd是要進行操作的文件描述符
cmd是控制命令
arg是與命令相關(guān)的參數(shù)
如果設(shè)置失敗會返回-1 并且錯誤碼會被設(shè)置 成功返回大于等于0
參數(shù)二的不同功能:
復制一個現(xiàn)有的描述符(cmd=F_DUPFD).
獲得/設(shè)置文件描述符標記(cmd=F_GETFD或F_SETFD).
獲得/設(shè)置文件狀態(tài)標記(cmd=F_GETFL或F_SETFL).
獲得/設(shè)置異步I/O所有權(quán)(cmd=F_GETOWN或F_SETOWN)
獲得/設(shè)置記錄鎖(cmd=F_GETLK,F_SETLK或F_SETLKW)
使用案例:
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
int main() {
// 獲取當前標準輸出的狀態(tài)標志
int flags = fcntl(STDOUT_FILENO, F_GETFL);
if (flags == -1) {
perror("fcntl");
return 1;
}
// 設(shè)置標準輸出為非阻塞模式
if (fcntl(STDOUT_FILENO, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl");
return 1;
}
// 嘗試從標準輸出中讀取數(shù)據(jù),但不會阻塞
char buffer[1024];
ssize_t bytesRead = read(STDOUT_FILENO, buffer, sizeof(buffer));
if (bytesRead == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("No data available in non-blocking mode.\n");
} else {
perror("read");
}
} else {
// 讀取到數(shù)據(jù)
buffer[bytesRead] = '\0';
printf("Read %zd bytes: %s", bytesRead, buffer);
}
return 0;
}
我們首先使用fcntl函數(shù)獲取標準輸出的當前狀態(tài)標志,然后使用fcntl再次設(shè)置標準輸出為非阻塞模式(通過將O_NONBLOCK標志添加到原來的標志中)。接下來,我們嘗試從標準輸出中讀取數(shù)據(jù),但由于標準輸出已設(shè)置為非阻塞模式,如果沒有數(shù)據(jù)可用,read將立即返回,并且errno會被設(shè)置為EAGAIN或EWOULDBLOCK,表示沒有數(shù)據(jù)可讀。
當我們使用了非阻塞IO的時候,每次讀取如果遇到了 EWOULDBLOCK或EAGAIN我們就可以讓我們的進程去做一會兒其他事情。
2.IO多路轉(zhuǎn)接之Select
接下來我們會帶大家了解select這個函數(shù),其實質(zhì)上就是在io等這一步上做了如下操作:
- 幫用戶一次等待多個sock
- 如果有sock就緒了 select就要通知用戶 這些sock就緒了 讓用戶調(diào)用read/recv函數(shù)來進行讀取
2.1函數(shù)的基本了解
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
1.nfds:需要等待的文件當中文件描述符最大的+1
因為文件描述符是遞歸增長的,所以給定一個最大值就能確定范圍。
這樣子的話我們豈不是要等待從 0 ~ nfds-1所有文件描述符了?關(guān)于這個問題看了下面的參數(shù)介紹就能明白了.
2.readfds 是一個指向讀取文件描述符集合的指針。
writefds 是一個指向?qū)懳募枋龇系闹羔槨? exceptfds 是一個指向異常文件描述符集合的指針。
這幾個其實都是輸出輸出型參數(shù)
我們要傳入的是一個fd_set類型的指針
這些參數(shù)在輸入的時候分別表示:
我們是否關(guān)心讀就緒
我們是否關(guān)心寫就緒
我們是否關(guān)心有異常
在輸出的時候分別表示:
哪些讀就緒了
哪些寫就緒了
出現(xiàn)哪些異常了
3.timeval *timeout
timeval實際上是一個結(jié)構(gòu)體 在Linux系統(tǒng)中 它的定義如下:
struct timeval {
time_t tv_sec; // 當前時間的秒
suseconds_t tv_usec; // 當前時間的微秒
};
我們讓select進行等待的時候 有三種模式可以供我們選擇:
阻塞式
非阻塞式
阻塞一段時間 之后返回
對于這個參數(shù)來說:
如果我輸入nullptr 那么它就是阻塞式的
如果我們輸入結(jié)構(gòu)體 {0 , 0} 那么它就是非阻塞式的
如果我們輸入結(jié)構(gòu)體{5, 0}那么它就會等待五秒鐘之后返回,但是如果說五秒內(nèi)有文件描述符就緒了的話,這個參數(shù)就會顯示出輸出性。比如說我們要求等五秒,而實際上2秒就有文件描述符就緒了,那么它就會返回{3 , 0}。
返回值類型:int
表示的是就緒的文件描述符的個數(shù)
只要讓我們等待的文件描述符中 有一個就緒了 它就會返回
2.2fd_set理解
fd_set 叫做文件描述符集,它本質(zhì)上是一個位圖 。
系統(tǒng)提供了四個函數(shù)來讓我們進行文件描述符集操作它們的作用如下:
- 清除某個文件描述符
- 判斷某個文件描述符是否被設(shè)置
- 設(shè)置文件描述符
- 清空文件描述符
我們舉個具體的使用場景:
fd_set *readfds
當它作為一個輸入?yún)?shù)時
- 它是用戶通知內(nèi)核的一種方式
- 在比特位中 比特位的下標表示文件描述符
- 比特位下標對應的內(nèi)容是否為1表示我對于該文件的讀是否關(guān)心
- 比如 0101 就是我對于2號和0號文件描述符的讀關(guān)心
當它作為一個輸出參數(shù)時
- 它是內(nèi)核通知用戶的一種方式
- 在比特位中 比特位的下標表示文件描述符
- 比特位下標對應的內(nèi)容是否為1表示該文件描述符的讀是否就緒
- 比如說 0100 就是用戶讓系統(tǒng)關(guān)心的0號和2號文件描述符中 2號文件描述符就緒了
至于fd_set *writefds fd_set *exceptfds
通知的內(nèi)容分別變成了 :是否關(guān)心寫,是否關(guān)心異常
2.3完整例子代碼(會在代碼中進行講解)
main.cc
#include "selectServer.hpp"
#include <memory>
int main()
{
// 1. fd_set是一個固定大小位圖,直接決定了select能同時關(guān)心的fd的個數(shù)是有上限的!
// std::cout << sizeof(fd_set) * 8 << std::endl;
std::unique_ptr<SelectServer> svr(new SelectServer());
svr->Start();
return 0;
}
Log.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
// 日志是有日志級別的
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *gLevelMap[] = {
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
#define LOGFILE "./selectServer.log"
// 完整的日志功能,至少: 日志等級 時間 支持用戶自定義(日志內(nèi)容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
// va_list ap;
// va_start(ap, format);
// while()
// int x = va_arg(ap, int);
// va_end(ap); //ap=nullptr
char stdBuffer[1024]; //標準部分
time_t timestamp = time(nullptr);
// struct tm *localtime = localtime(×tamp);
snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
char logBuffer[1024]; //自定義部分
va_list args;
va_start(args, format);
// vprintf(format, args);
vsnprintf(logBuffer, sizeof logBuffer, format, args);
va_end(args);
// FILE *fp = fopen(LOGFILE, "a");
printf("%s%s\n", stdBuffer, logBuffer);
// fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
// fclose(fp);
}
Sock.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
class Sock
{
private:
// listen的第二個參數(shù),意義:底層全連接隊列的長度 = listen的第二個參數(shù)+1
const static int gbacklog = 10;
public:
Sock() {}
static int Socket()
{
int listensock = socket(AF_INET, SOCK_STREAM, 0);
if (listensock < 0)
{
exit(2);
}
int opt = 1;
setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
return listensock;
}
static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0")
{
struct sockaddr_in local;
memset(&local, 0, sizeof local);
local.sin_family = AF_INET;
local.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &local.sin_addr);
if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
exit(3);
}
}
static void Listen(int sock)
{
if (listen(sock, gbacklog) < 0)
{
exit(4);
}
}
// 一般經(jīng)驗
// const std::string &: 輸入型參數(shù)
// std::string *: 輸出型參數(shù)
// std::string &: 輸入輸出型參數(shù)
static int Accept(int listensock, std::string *ip, uint16_t *port)
{
struct sockaddr_in src;
socklen_t len = sizeof(src);
int servicesock = accept(listensock, (struct sockaddr *)&src, &len);
if (servicesock < 0)
{
return -1;
}
if(port) *port = ntohs(src.sin_port);
if(ip) *ip = inet_ntoa(src.sin_addr);
return servicesock;
}
static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port)
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(server_port);
server.sin_addr.s_addr = inet_addr(server_ip.c_str());
if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true;
else return false;
}
~Sock() {}
};
selectServer.hpp
#ifndef __SELECT_SVR_H__
#define __SELECT_SVR_H__
#include <iostream>
#include <string>
#include <vector>
#include <sys/select.h>
#include <sys/time.h>
#include "Log.hpp"
#include "Sock.hpp"
#define BITS 8
#define NUM (sizeof(fd_set)*BITS)
#define FD_NONE -1
using namespace std;
// select 我們只完成讀取,寫入和異常不做處理 -- epoll(寫完整)
class SelectServer
{
public:
SelectServer(const uint16_t &port = 8080) : _port(port)
{
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
logMessage(DEBUG,"%s","create base socket success");
for(int i = 0; i < NUM; i++) _fd_array[i] = FD_NONE;
// 規(guī)定 : _fd_array[0] = _listensock;
_fd_array[0] = _listensock;
}
void Start()
{
while (true)
{
// struct timeval timeout = {0, 0};
// 如何看待listensock? 獲取新連接,我們把它依舊看做成為IO,input事件,如果沒有連接到來呢?阻塞
// int sock = Sock::Accept(listensock, ...); //不能直接調(diào)用accept了
// 將listensock添加到讀文件描述符集中
// FD_SET(_listensock, &rfds);
// int n = select(_listensock + 1, &rfds, nullptr, nullptr, &timeout);
// 1. nfds: 隨著我們獲取的sock越來越多,隨著我們添加到select的sock越來越多,注定了nfds每一次都可能要變化,我們需要對它動態(tài)計算
// 2. rfds/writefds/exceptfds:都是輸入輸出型參數(shù),輸入輸出不一定以一樣的,所以注定了我們每一次都要對rfds進行重新添加
// 3. timeout: 都是輸入輸出型參數(shù),每一次都要進行重置,前提是你要的話
// 1,2 => 注定了我們必須自己將合法的文件描述符需要單獨全部保存起來 用來支持:1. 更新最大fd 2.更新位圖結(jié)構(gòu)
DebugPrint();
fd_set rfds;
FD_ZERO(&rfds);
int maxfd = _listensock;
for(int i = 0; i < NUM; i++)
{
if(_fd_array[i] == FD_NONE) continue;
FD_SET(_fd_array[i], &rfds);
if(maxfd < _fd_array[i]) maxfd = _fd_array[i];
}
// rfds未來,一定會有兩類sock,listensock,普通sock
// 我們select中,就緒的fd會越來越多!
int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);
switch (n)
{
case 0:
// printf("hello select ...\n");
logMessage(DEBUG, "%s", "time out...");
break;
case -1:
logMessage(WARNING, "select error: %d : %s", errno, strerror(errno));
break;
default:
// 成功的
logMessage(DEBUG, "get a new link event..."); // 為什么會一直打印連接到來呢?連接已經(jīng)建立完成,就緒了,但是你沒有取走,select要一直通知你!
HandlerEvent(rfds);
break;
}
}
}
~SelectServer()
{
if (_listensock >= 0)
close(_listensock);
}
private:
void HandlerEvent(const fd_set &rfds) // fd_set 是一個集合,里面可能會存在多個sock
{
for(int i = 0; i < NUM; i++)
{
// 1. 去掉不合法的fd
if(_fd_array[i] == FD_NONE) continue;
// 2. 合法的就一定就緒了?不一定
if(FD_ISSET(_fd_array[i], &rfds))
{
//指定的fd,讀事件就緒
// 讀事件就緒:連接時間到來,accept
if(_fd_array[i] == _listensock) Accepter();
else Recver(i);
}
}
}
void Accepter()
{
string clientip;
uint16_t clientport = 0;
// listensock上面的讀事件就緒了,表示可以讀取了
// 獲取新連接了
int sock = Sock::Accept(_listensock, &clientip, &clientport); // 這里在進行accept會不會阻塞?不會!
if(sock < 0)
{
logMessage(WARNING, "accept error");
return;
}
logMessage(DEBUG, "get a new line success : [%s:%d] : %d", clientip.c_str(), clientport, sock);
// read / recv? 不能!為什么不能?我們不清楚該sock上面數(shù)據(jù)什么時候到來, recv、read就有可能先被阻塞,IO = 等+數(shù)據(jù)拷貝
// 誰可能最清楚呢?select!
// 得到新連接的時候,此時我們應該考慮的是,將新的sock托管給select,讓select幫我們進行檢測sock上是否有新的數(shù)據(jù)
// 有了數(shù)據(jù)select,讀事件就緒,select就會通知我,我們在進行讀取,此時我們就不會被阻塞了
// 要將sock添加 給 select, 其實我們只要將fd放入到數(shù)組中即可!
int pos = 1;
for(; pos < NUM; pos++){
if(_fd_array[pos] == FD_NONE) break;
}
if(pos == NUM){
logMessage(WARNING, "%s:%d", "select server already full,close: %d", sock);
close(sock);
}else{
_fd_array[pos] = sock;
}
}
void Recver(int pos)
{
// 讀事件就緒:INPUT事件到來、recv,read
logMessage(DEBUG, "message in, get IO event: %d", _fd_array[pos]);
// 暫時先不做封裝, 此時select已經(jīng)幫我們進行了事件檢測,fd上的數(shù)據(jù)一定是就緒的,即 本次 不會被阻塞
// 這樣讀取有bug嗎?有的,你怎么保證以讀到了一個完整包文呢?
char buffer[1024];
int n = recv(_fd_array[pos], buffer, sizeof(buffer)-1, 0);
if(n > 0){
buffer[n] = 0;
logMessage(DEBUG, "client[%d]# %s", _fd_array[pos], buffer);
}
else if(n == 0){
logMessage(DEBUG, "client[%d] quit, me too...", _fd_array[pos]);
// 1. 我們也要關(guān)閉不需要的fd
close(_fd_array[pos]);
// 2. 不要讓select幫我關(guān)心當前的fd了
_fd_array[pos] = FD_NONE;
}
else{
logMessage(WARNING, "%d sock recv error, %d : %s", _fd_array[pos], errno, strerror(errno));
// 1. 我們也要關(guān)閉不需要的fd
close(_fd_array[pos]);
// 2. 不要讓select幫我關(guān)心當前的fd了
_fd_array[pos] = FD_NONE;
}
}
void DebugPrint()
{
cout << "_fd_array[]: ";
for(int i = 0; i < NUM; i++)
{
if(_fd_array[i] == FD_NONE) continue;
cout << _fd_array[i] << " ";
}
cout << endl;
}
private:
uint16_t _port;
int _listensock;
int _fd_array[NUM];
// int _fd_write[NUM];
// std::vector<int> arr;
};
#endif
2.4優(yōu)缺點
優(yōu)點:
- 效率高 IO等的時間少 尤其是在有大量連接 并且只有少量活躍的情況下
- 單進程 占用資源少
缺點:
- 為了維護第三方數(shù)組 select服務器充滿大量的遍歷操作
- 每一次都要對select參數(shù)進行重新設(shè)定
- 能夠同時管理的fd的個數(shù)是有上限的
- 由于參數(shù)是輸入輸出的 所以避免不了大量用戶和內(nèi)核之間的拷貝
- 編碼比較復雜
3.多路轉(zhuǎn)接之poll
poll是系統(tǒng)提供的一個多路轉(zhuǎn)接接口,它的作用和select函數(shù)基本一致。
3.1poll函數(shù)的介紹
原形:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd *fds
里面包含著文件描述符表,我們需要監(jiān)視的文件描述符合集和就緒的文件描述符合集
- fd 特定的文件描述符值
- events 用戶告訴內(nèi)核 哪些事件需要關(guān)心
- revents 內(nèi)核告訴用戶 哪些事件就緒了
(也是用了位圖結(jié)構(gòu)來存儲數(shù)據(jù))
一個文件描述符實際上就是對應一個struct pollfd
,所以說理論上只要有多少個數(shù)組我們的poll就能檢測多少的文件描述符。
以下是events和revents的取值:
我們需要特別注意的有三個分別是:
- POLLIN 可讀
- POLLOUT 可寫
- POLLERR 錯誤
nfds_t nfds
fds數(shù)組的長度
timeout
超時時間
- 單位是毫秒 比如說我們設(shè)置為1000 就是等待1秒
- 如果設(shè)置為0 就表示非阻塞模式
- 如果設(shè)置為-1 就表示阻塞模式
3.2poll服務器
我們將上面寫的select的服務器修改一下:
私有成員變化如下:
private:
int _port;
int _listensock;
struct pollfd *_rfds;
func_t _func;
對比于我們select的第三方數(shù)組來說,我們這里多了一個數(shù)組指針和數(shù)組大小.
在初始化的時候 我們首先new出一個 struct pollfd
數(shù)組出來 ,并且遍歷初始化一下.
_rfds[i].fd = defaultfd;
_rfds[i].events = 0;
_rfds[i].revents = 0;
對于數(shù)據(jù)如何判斷就緒,我們可以使用按位與來判斷:
_rfds[i].revents & POLLIN
具體代碼如下:
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "sock.hpp"
namespace select_ns
{
static const int defaultport = 8081;
static const int fdnum = sizeof(fd_set) * 8;
static const int defaultfd = -1;
using func_t = std::function<std::string (const std::string&)>;
class SelectServer
{
public:
SelectServer(func_t f, int port = defaultport) : func(f), _port(port), _listensock(-1), fdarray(nullptr)
{
}
void initServer()
{
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
fdarray = new int[fdnum];
for (int i = 0; i < fdnum; i++)
fdarray[i] = defaultfd;
fdarray[0] = _listensock; // 不變了
}
void Print()
{
std::cout << "fd list: ";
for (int i = 0; i < fdnum; i++)
{
if (fdarray[i] != defaultfd)
std::cout << fdarray[i] << " ";
}
std::cout << std::endl;
}
void Accepter(int listensock)
{
logMessage(DEBUG, "Accepter in");
// 走到這里,accept 函數(shù),會不會阻塞???1 0
// select 告訴我, listensock讀事件就緒了
std::string clientip;
uint16_t clientport = 0;
int sock = Sock::Accept(listensock, &clientip, &clientport); // accept = 等 + 獲取
if (sock < 0)
return;
logMessage(NORMAL, "accept success [%s:%d]", clientip.c_str(), clientport);
// sock我們能直接recv/read 嗎?不能,整個代碼,只有select有資格檢測事件是否就緒
// 將新的sock 托管給select!
// 將新的sock托管給select的本質(zhì),其實就是將sock,添加到fdarray數(shù)組中即可!
int i = 0;
for (; i < fdnum; i++)
{
if (fdarray[i] != defaultfd)
continue;
else
break;
}
if (i == fdnum)
{
logMessage(WARNING, "server if full, please wait");
close(sock);
}
else
{
fdarray[i] = sock;
}
Print();
logMessage(DEBUG, "Accepter out");
}
void Recver(int sock, int pos)
{
logMessage(DEBUG, "in Recver");
// 1. 讀取request
// 這樣讀取是有問題的!
char buffer[1024];
ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0); // 這里在進行讀取的時候,會不會被阻塞?1, 0
if (s > 0)
{
buffer[s] = 0;
logMessage(NORMAL, "client# %s", buffer);
}
else if (s == 0)
{
close(sock);
fdarray[pos] = defaultfd;
logMessage(NORMAL, "client quit");
return;
}
else
{
close(sock);
fdarray[pos] = defaultfd;
logMessage(ERROR, "client quit: %s", strerror(errno));
return;
}
// 2. 處理request
std::string response = func(buffer);
// 3. 返回response
// write bug
write(sock, response.c_str(), response.size());
logMessage(DEBUG, "out Recver");
}
// 1. handler event rfds 中,不僅僅是有一個fd是就緒的,可能存在多個
// 2. 我們的select目前只處理了read事件
void HandlerReadEvent(fd_set &rfds)
{
for (int i = 0; i < fdnum; i++)
{
// 過濾掉非法的fd
if (fdarray[i] == defaultfd)
continue;
// 正常的fd
// 正常的fd不一定就緒了
// 目前一定是listensock,只有這一個
if (FD_ISSET(fdarray[i], &rfds) && fdarray[i] == _listensock)
Accepter(_listensock);
else if(FD_ISSET(fdarray[i], &rfds))
Recver(fdarray[i], i);
else{}
}
}
void start()
{
for (;;)
{
fd_set rfds;
// fd_set wfds;
FD_ZERO(&rfds);
int maxfd = fdarray[0];
for (int i = 0; i < fdnum; i++)
{
if (fdarray[i] == defaultfd)
continue;
FD_SET(fdarray[i], &rfds); // 合法 fd 全部添加到讀文件描述符集中
if (maxfd < fdarray[i])
maxfd = fdarray[i]; // 更新所有fd中最大的fd
}
logMessage(NORMAL, "max fd is: %d", maxfd);
// struct timeval timeout = {1, 0};
// int n = select(_listensock + 1, &rfds, nullptr, nullptr, &timeout); // ??
// 一般而言,要是用select,需要程序員自己維護一個保存所有合法fd的數(shù)組!
int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr); // ??
switch (n)
{
case 0:
logMessage(NORMAL, "timeout...");
break;
case -1:
logMessage(WARNING, "select error, code: %d, err string: %s", errno, strerror(errno));
break;
default:
// 說明有事件就緒了,目前只有一個監(jiān)聽事件就緒了
logMessage(NORMAL, "have event ready!");
HandlerReadEvent(rfds);
// HandlerWriteEvent(wfds);
break;
}
// std::string clientip;
// uint16_t clientport = 0;
// int sock = Sock::Accept(_listensock, &clientip, &clientport); // accept = 等 + 獲取
// if(sock<0) continue;
// // 開始進行服務器的處理邏輯
}
}
~SelectServer()
{
if (_listensock < 0)
close(_listensock);
if (fdarray)
delete[] fdarray;
}
private:
int _port;
int _listensock;
int *fdarray;
func_t func;
};
}
3.3優(yōu)缺點
優(yōu)點:
- 效率高
- 適合有大量連接 少量活躍
- 輸入輸出分離,接口使用方便
- poll參數(shù)級別 沒有可管理的fd上限
缺點:
- poll依舊需要不少的遍歷
- poll需要內(nèi)核到用戶的拷貝
- poll的代碼雖然比select容易 但是也很復雜
4.epoll
4.1初始epoll
epoll是為了處理大量句柄而做出改進的poll(句柄可以是一個整數(shù)、指針、引用或其他數(shù)據(jù)結(jié)構(gòu),它用于唯一標識和訪問特定資源或?qū)ο?。?/p>
它在2.5.44內(nèi)核中被引入到Linux
也是目前來說最常用的一種多路轉(zhuǎn)接IO方式
4.2epoll相關(guān)的系統(tǒng)調(diào)用
- epoll_create
- epoll_ctl
- epoll_wait
int epoll_create(int size); //創(chuàng)建一個epoll模型
參數(shù)說明:
目前來說 epoll_create的參數(shù)是被廢棄的 我們設(shè)置為256或者512就行 這樣設(shè)計的原因是為了向前兼容
返回值說明:
返回一個epoll模型 (實際上就是一個文件描述符)
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);//對創(chuàng)建出來的epoll模型進行操控
參數(shù)說明:
1.int epfd 標識一個我們的IO模型
2.int op (operator) 表示我們想要做出什么樣的操作
3.int fd 表示我們需要添加的文件描述符
4.epoll_event *event 表示我們需要關(guān)心哪些事件
返回值說明:
函數(shù)成功調(diào)用返回0 失敗返回-1 同時錯誤碼將被設(shè)置
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
//監(jiān)視我們關(guān)心的關(guān)鍵描述符
參數(shù)說明:
1.epfd:是 epoll 實例的文件描述符,它代表你要監(jiān)聽的一組文件描述符的集合。
2.events:是一個指向 struct epoll_event 數(shù)組的指針,用于存儲就緒事件的信息。
3.maxevents:指定 events 數(shù)組的最大大小,即最多可以存儲多少就緒事件的信息。
4.timeout:指定等待事件的超時時間,單位為毫秒。傳遞負值會使 epoll_wait 成為阻塞調(diào)用,直到有事件發(fā)生,傳遞零會使它成為非阻塞調(diào)用,立即返回,傳遞正值會在指定時間內(nèi)等待事件。
返回值:epoll_wait 返回就緒事件的數(shù)量,如果發(fā)生錯誤,則返回 -1。如果超時時間到期而沒有事件發(fā)生,它將返回 0。
poll_wait 返回的事件信息存儲在 events 數(shù)組中。每個 struct epoll_event 結(jié)構(gòu)包含以下信息:
events:一個位掩碼,指示事件類型,如可讀、可寫、錯誤等。
data:一個聯(lián)合,可以存儲用戶定義的數(shù)據(jù),通常是文件描述符或其他標識符。
4.3epoll的工作原理
我們之前的學習的多路轉(zhuǎn)接函數(shù) 無論是select還有poll 它們都需要我們做下面的操作:
- 讓我們維護一個第三方的數(shù)組
- 都需要遍歷整個數(shù)組
- 都需要經(jīng)歷用戶到內(nèi)核 內(nèi)核到用戶的事件通知
而我們的epoll則不同。
當然在我們講解epoll的具體工作原理時我們需要先了解一些前置知識:
操作系統(tǒng)是如何知道硬件里面有數(shù)據(jù)了呢?
下圖很好的解釋了這一問題:
正式講解 |
---|
當我們創(chuàng)建一個epoll模型之后操作系統(tǒng)底層會幫助我們維護一顆紅黑樹
紅黑樹的節(jié)點里面維護著很多元素 其中最重要的是兩個:
- 文件描述符
- 事件
這顆紅黑樹解決了用戶通知內(nèi)核的問題。
用戶通知內(nèi)核自己要關(guān)心哪些文件描述符的哪些事件之后,操作系統(tǒng)就會生成一個節(jié)點然后插入到這顆紅黑樹當中
而這顆紅黑樹就是對應我們select和poll當中的數(shù)組。(現(xiàn)在由操作系統(tǒng)維護了)
當內(nèi)核通知用戶的則是通過了消息隊列通知:
在內(nèi)核維護的紅黑樹旁邊有一個消息隊列(也交就緒隊列),每當有fd的事件就緒的時候就會在該隊列上添加一個元素(也是由操作系統(tǒng)維護)。
操作系統(tǒng)在調(diào)用驅(qū)動的時候構(gòu)建就緒隊列節(jié)點:
在生成紅黑樹節(jié)點的時候,在驅(qū)動中,每個節(jié)點都會生成一個自己的回調(diào)函數(shù)。
于是在經(jīng)歷了硬件中斷到讀取數(shù)據(jù)的過程后,操作系統(tǒng)會調(diào)用驅(qū)動中的回調(diào)函數(shù)來獲取該節(jié)點的數(shù)據(jù) ,并且根據(jù)這些數(shù)據(jù)(fd和events)構(gòu)建就緒節(jié)點,最后將構(gòu)建好的節(jié)點插入到隊列中。
知道了這些后,不妨再來看看我們上面提到的函數(shù):
- epoll_create : 創(chuàng)建epoll模型 包括紅黑樹 就緒隊列 回調(diào)函數(shù)等(這個描述符所對應的文件里面有指針可以找到紅黑樹和就緒隊列)
- epoll_ctl : 對于紅黑樹的節(jié)點進行注冊
- epoll_wait : 獲取就緒隊列中的內(nèi)容
4.4epoll的工作方式
epoll有2種工作方式-水平觸發(fā)(LT)和邊緣觸發(fā)(ET)
你正在吃雞, 眼看進入了決賽圈, 你媽飯做好了, 喊你吃飯的時候有兩種方式:
- 如果你媽喊你一次, 你沒動, 那么你媽會繼續(xù)喊你第二次, 第三次…(親媽, 水平觸發(fā))
- 如果你媽喊你一次, 你沒動, 你媽就不管你了(后媽, 邊緣觸發(fā))
舉個例子來幫大家徹底了解這兩種模式:
-
我們已經(jīng)把一個tcp socket添加到epoll描述符
-
這個時候socket的另一端被寫入了2KB的數(shù)據(jù)
-
調(diào)用epoll_wait,并且它會返回. 說明它已經(jīng)準備好讀取操作
-
然后調(diào)用read, 只讀取了1KB的數(shù)據(jù)
-
繼續(xù)調(diào)用epoll_wait…
水平觸發(fā)Level Triggere 工作模式:
epoll默認狀態(tài)下就是LT工作模式.
當epoll檢測到socket上事件就緒的時候, 可以不立刻進行處理. 或者只處理一部分.
如上面的例子, 由于只讀了1K數(shù)據(jù), 緩沖區(qū)中還剩1K數(shù)據(jù), 在第二次調(diào)用 epoll_wait 時, epoll_wait
仍然會立刻返回并通知socket讀事件就緒.
直到緩沖區(qū)上所有的數(shù)據(jù)都被處理完, epoll_wait 才不會立刻返回.
支持阻塞讀寫和非阻塞讀寫
邊緣觸發(fā)Edge Triggered工作模式:
如果我們在第1步將socket添加到epoll描述符的時候使用了EPOLLET標志, epoll進入ET工作模式.
當epoll檢測到socket上事件就緒時, 必須立刻處理.
如上面的例子, 雖然只讀了1K的數(shù)據(jù), 緩沖區(qū)還剩1K的數(shù)據(jù), 在第二次調(diào)用 epoll_wait 的時候,
epoll_wait 不會再返回了.
也就是說, ET模式下, 文件描述符上的事件就緒后, 只有一次處理機會.
ET的性能比LT性能更高( epoll_wait 返回的次數(shù)少了很多). Nginx默認采用ET模式使用epoll.
只支持非阻塞的讀寫
select和poll其實也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET.
4.5LT模式重要代碼
tcp_epoll_server.hpp
#pragma once
#include <vector>
#include <functional>
#include <sys/epoll.h>
#include "tcp_socket.hpp"
typedef std::function<void(const std::string&, std::string* resp)> Handler;
class Epoll {
public:
Epoll() {
epoll_fd_ = epoll_create(10);
}
~Epoll() {
close(epoll_fd_);
}
bool Add(const TcpSocket& sock) const {
int fd = sock.GetFd();
printf("[Epoll Add] fd = %d\n", fd);
epoll_event ev;
ev.data.fd = fd;
ev.events = EPOLLIN;
int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
if (ret < 0) {
perror("epoll_ctl ADD");
return false;
}
return true;
}
bool Del(const TcpSocket& sock) const {
int fd = sock.GetFd();
printf("[Epoll Del] fd = %d\n", fd);
int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);
if (ret < 0) {
perror("epoll_ctl DEL");
return false;
}
return true;
}
bool Wait(std::vector<TcpSocket>* output) const {
output->clear();
epoll_event events[1000];
int nfds = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1);
if (nfds < 0) {
perror("epoll_wait");
return false;
}
// [注意!] 此處必須是循環(huán)到 nfds, 不能多循環(huán)
for (int i = 0; i < nfds; ++i) {
TcpSocket sock(events[i].data.fd);
output->push_back(sock);
}
return true;
}
private:
int epoll_fd_;
};
class TcpEpollServer {
public:
TcpEpollServer(const std::string& ip, uint16_t port) : ip_(ip), port_(port) {
}
bool Start(Handler handler) {
// 1. 創(chuàng)建 socket
TcpSocket listen_sock;
CHECK_RET(listen_sock.Socket());
// 2. 綁定
CHECK_RET(listen_sock.Bind(ip_, port_));
// 3. 監(jiān)聽
CHECK_RET(listen_sock.Listen(5));
// 4. 創(chuàng)建 Epoll 對象, 并將 listen_sock 加入進去
Epoll epoll;
epoll.Add(listen_sock);
// 5. 進入事件循環(huán)
for (;;) {
// 6. 進行 epoll_wait
std::vector<TcpSocket> output;
if (!epoll.Wait(&output)) {
continue;
}
// 7. 根據(jù)就緒的文件描述符的種類決定如何處理
for (size_t i = 0; i < output.size(); ++i) {
if (output[i].GetFd() == listen_sock.GetFd()) {
// 如果是 listen_sock, 就調(diào)用 accept
TcpSocket new_sock;
listen_sock.Accept(&new_sock);
epoll.Add(new_sock);
}
else {
// 如果是 new_sock, 就進行一次讀寫
std::string req, resp;
bool ret = output[i].Recv(&req);
if (!ret) {
// [注意!!] 需要把不用的 socket 關(guān)閉
// 先后順序別搞反. 不過在 epoll 刪除的時候其實就已經(jīng)關(guān)閉 socket 了
epoll.Del(output[i]);
output[i].Close();
continue;
}
handler(req, &resp);
output[i].Send(resp);
} // end for
} // end for (;;)
}
return true;
}
private:
std::string ip_;
uint16_t port_;
};
4.6ET模式重要代碼
基于 LT 版本稍加修改即可
- 修改 tcp_socket.hpp, 新增非阻塞讀和非阻塞寫接口
- 對于 accept 返回的 new_sock 加上 EPOLLET 這樣的選項
注意: 此代碼暫時未考慮 listen_sock ET 的情況. 如果將 listen_sock 設(shè)為 ET, 則需要非阻塞輪詢的方式 accept. 否則會導致同一時刻大量的客戶端同時連接的時候, 只能 accept 一次的問題.
tcp_socket.hpp
// 以下代碼添加在 TcpSocket 類中
// 非阻塞 IO 接口
bool SetNoBlock() {
int fl = fcntl(fd_, F_GETFL);
if (fl < 0) {
perror("fcntl F_GETFL");
return false;
}
int ret = fcntl(fd_, F_SETFL, fl | O_NONBLOCK);
if (ret < 0) {
perror("fcntl F_SETFL");
return false;
}
return true;
}
bool RecvNoBlock(std::string* buf) const {
// 對于非阻塞 IO 讀數(shù)據(jù), 如果 TCP 接受緩沖區(qū)為空, 就會返回錯誤
// 錯誤碼為 EAGAIN 或者 EWOULDBLOCK, 這種情況也是意料之中, 需要重試
// 如果當前讀到的數(shù)據(jù)長度小于嘗試讀的緩沖區(qū)的長度, 就退出循環(huán)
// 這種寫法其實不算特別嚴謹(沒有考慮粘包問題)
buf->clear();
char tmp[1024 * 10] = { 0 };
for (;;) {
ssize_t read_size = recv(fd_, tmp, sizeof(tmp) - 1, 0);
if (read_size < 0) {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
continue;
}
perror("recv");
比特就業(yè)課
tcp_epoll_server.hpp
return false;
}
if (read_size == 0) {
// 對端關(guān)閉, 返回 false
return false;
}
tmp[read_size] = '\0';
*buf += tmp;
if (read_size < (ssize_t)sizeof(tmp) - 1) {
break;
}
}
return true;
}
bool SendNoBlock(const std::string& buf) const {
// 對于非阻塞 IO 的寫入, 如果 TCP 的發(fā)送緩沖區(qū)已經(jīng)滿了, 就會出現(xiàn)出錯的情況
// 此時的錯誤號是 EAGAIN 或者 EWOULDBLOCK. 這種情況下不應放棄治療
// 而要進行重試
ssize_t cur_pos = 0; // 記錄當前寫到的位置
ssize_t left_size = buf.size();
for (;;) {
ssize_t write_size = send(fd_, buf.data() + cur_pos, left_size, 0);
if (write_size < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 重試寫入
continue;
}
return false;
}
cur_pos += write_size;
left_size -= write_size;
// 這個條件說明寫完需要的數(shù)據(jù)了
if (left_size <= 0) {
break;
}
}
return true;
}
tcp_epoll_server.hpp
#pragma once
#include <vector>
#include <functional>
#include <sys/epoll.h>
#include "tcp_socket.hpp"
typedef std::function<void(const std::string&, std::string* resp)> Handler;
class Epoll {
public:
Epoll() {
epoll_fd_ = epoll_create(10);
}
~Epoll() {
close(epoll_fd_);
}
bool Add(const TcpSocket& sock, bool epoll_et = false) const {
int fd = sock.GetFd();
printf("[Epoll Add] fd = %d\n", fd);
epoll_event ev;
ev.data.fd = fd;
if (epoll_et) {
ev.events = EPOLLIN | EPOLLET;
}
else {
ev.events = EPOLLIN;
}
int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
if (ret < 0) {
perror("epoll_ctl ADD");
return false;
}
return true;
}
bool Del(const TcpSocket& sock) const {
int fd = sock.GetFd();
printf("[Epoll Del] fd = %d\n", fd);
int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);
if (ret < 0) {
perror("epoll_ctl DEL");
return false;
}
return true;
}
bool Wait(std::vector<TcpSocket>* output) const {
output->clear();
epoll_event events[1000];
int nfds = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1);
if (nfds < 0) {
perror("epoll_wait");
return false;
}
// [注意!] 此處必須是循環(huán)到 nfds, 不能多循環(huán)
for (int i = 0; i < nfds; ++i) {
TcpSocket sock(events[i].data.fd);
output->push_back(sock);
}
return true;
}
private:
int epoll_fd_;
};
class TcpEpollServer {
public:
TcpEpollServer(const std::string& ip, uint16_t port) : ip_(ip), port_(port) {
}
bool Start(Handler handler) {
// 1. 創(chuàng)建 socket
TcpSocket listen_sock;
CHECK_RET(listen_sock.Socket());
// 2. 綁定
CHECK_RET(listen_sock.Bind(ip_, port_));
// 3. 監(jiān)聽
CHECK_RET(listen_sock.Listen(5));
// 4. 創(chuàng)建 Epoll 對象, 并將 listen_sock 加入進去
Epoll epoll;
epoll.Add(listen_sock);
// 5. 進入事件循環(huán)
for (;;) {
// 6. 進行 epoll_wait
std::vector<TcpSocket> output;
if (!epoll.Wait(&output)) {
continue;
}
// 7. 根據(jù)就緒的文件描述符的種類決定如何處理
for (size_t i = 0; i < output.size(); ++i) {
if (output[i].GetFd() == listen_sock.GetFd()) {
// 如果是 listen_sock, 就調(diào)用 accept
TcpSocket new_sock;
listen_sock.Accept(&new_sock);
epoll.Add(new_sock, true);
}
else {
// 如果是 new_sock, 就進行一次讀寫
std::string req, resp;
bool ret = output[i].RecvNoBlock(&req);
if (!ret) {
// [注意!!] 需要把不用的 socket 關(guān)閉
// 先后順序別搞反. 不過在 epoll 刪除的時候其實就已經(jīng)關(guān)閉 socket 了
epoll.Del(output[i]);
output[i].Close();
continue;
}
handler(req, &resp);
output[i].SendNoBlock(resp);
printf("[client %d] req: %s, resp: %s\n", output[i].GetFd(),
req.c_str(), resp.c_str());
} // end for
} // end for (;;)
}
return true;
}
private:
std::string ip_;
uint16_t port_;
};
onst std::string& ip, uint16_t port) : ip_(ip), port_(port) {
}
bool Start(Handler handler) {
// 1. 創(chuàng)建 socket
TcpSocket listen_sock;
CHECK_RET(listen_sock.Socket());
// 2. 綁定
CHECK_RET(listen_sock.Bind(ip_, port_));
// 3. 監(jiān)聽
CHECK_RET(listen_sock.Listen(5));
// 4. 創(chuàng)建 Epoll 對象, 并將 listen_sock 加入進去
Epoll epoll;
epoll.Add(listen_sock);
// 5. 進入事件循環(huán)
for (;?? {
// 6. 進行 epoll_wait
std::vector output;
if (!epoll.Wait(&output)) {
continue;
}
// 7. 根據(jù)就緒的文件描述符的種類決定如何處理
for (size_t i = 0; i < output.size(); ++i) {
if (output[i].GetFd() == listen_sock.GetFd()) {
// 如果是 listen_sock, 就調(diào)用 accept
TcpSocket new_sock;
listen_sock.Accept(&new_sock);
epoll.Add(new_sock, true);
}
else {
// 如果是 new_sock, 就進行一次讀寫
std::string req, resp;
bool ret = output[i].RecvNoBlock(&req);
if (!ret) {
// [注意!!] 需要把不用的 socket 關(guān)閉
// 先后順序別搞反. 不過在 epoll 刪除的時候其實就已經(jīng)關(guān)閉 socket 了
epoll.Del(output[i]);
output[i].Close();
continue;
}
handler(req, &resp);
output[i].SendNoBlock(resp);
printf(“[client %d] req: %s, resp: %s\n”, output[i].GetFd(),
req.c_str(), resp.c_str());
} // end for
} // end for (;??
}
return true;
}文章來源:http://www.zghlxwxcb.cn/news/detail-745171.html
private:
std::string ip_;
uint16_t port_;
};文章來源地址http://www.zghlxwxcb.cn/news/detail-745171.html
到了這里,關(guān)于Linux中的高級IO的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!