目錄
版本四:線程池
注意事項
文件:Task.hpp -- 任務單獨為一個文件
組件:日志修改
新函數(shù):vprintf()
可變參數(shù)的提取邏輯
vfprintf()的工作原理
初始化一個va_list
日志準備
獲取時間小知識
日志初版
日志啟動測試
TCP通用服務器(守護進程)?*
新指令1:jobs -- 查看進程作業(yè)
新指令2:fg --?foreground(前臺)
新指令3:bg(Ctrl + Z) -- background(后臺)
創(chuàng)建新的會話
新接口2:daemon() -- 選用(本文不用)
新接口3:setsid() *
小細節(jié)1:null文件 -- 任何請求都可以被接收,但是都會被丟棄
接口1:open() -- 打開文件
接口2:dup2 -- 重定向
接口3:chdir() -- 進程執(zhí)行路徑更改(選填)
文件:daemon.hpp(守護進程)
測試:守護進程
測試服務器的正常回顯
日志修改:日志寫入文件中保存
全部代碼文件
daemon.hpp -- 守護進程
log.hpp -- 日志文件
makefile
tcpClient.cc -- 客戶端1
tcpClient.hpp -- 客戶端2
tcpServer.cc -- 服務端1
tcpServer.hpp -- 服務端2
Task.hpp --?形成任務?
Thread.hpp -- 線程池1
ThreadPool.hpp -- 線程池2
LockGuard.hpp -- 線程池(加鎖部分)
接上文:
版本四:線程池
????????至此,多線程與多進程的版本完成,但是由于無論是線程的創(chuàng)建還是進程的創(chuàng)建,都是事情到來的時候(鏈接到來的時候)才創(chuàng)建任務,所以會存在一個頻繁創(chuàng)建的問題。還有一個問題,就是客戶有多少,就需要多少個進程或者線程,那一個服務器的效率就不會很高,應付幾十、幾百可以,但是一但多一點就不行了,為此這里就引入一個基于管道式的線程池的組件來實現(xiàn)第四個版本
????????一個主線程打開的文件,新線程是可以看到的,也就是說線程池對于資源共享有天然的優(yōu)勢,于是可以把新鏈接構成一個新任務,把任務傳遞給線程池來統(tǒng)一執(zhí)行
? ? ? ? 注意這里就直接引入一個線程池組件進行使用,就不再講解了
? ? ? ? 關于線程池的介紹可以參考文章 -- 待更新
????????這里的線程池是一個單例模式
注意事項
? ? ? ? 注意線程池的版本對于代碼是有整體修改的,所以下面的代碼會進行一定程度的整理和修改
????????因為這里的serviceIO服務和類里面是沒有任何關系的,所以代碼可以直接拿出去,將其放入Task.hpp文件中去
文件:Task.hpp -- 任務單獨為一個文件
#pragma once
#include <iostream>
#include <string>
#include <cstdio>
#include <functional>
// 因為這里的服務和類里面是沒有任何關系的,所以代碼可以直接拿出去
void serviceIO(int sock)
{
char buffer[1024];
while (true)
{
ssize_t n = read(sock, buffer, sizeof(buffer) - 1);
if (n > 0)
{
// 目前我們把讀到的數(shù)據(jù)當成字符串, 截止目前
buffer[n] = 0;
std::cout << "recv message: " << buffer << std::endl;
std::string outbuffer = buffer;
outbuffer += " server[echo]";
write(sock, outbuffer.c_str(), outbuffer.size()); // 多路轉(zhuǎn)接
}
else if (n == 0)
{
// 代表client退出
logMessage(NORMAL, "client quit, me too!");
break;
}
}
close(sock);
}
class Task
{
using func_t = std::function<void(int)>;
public:
Task()
{
}
Task(int sock, func_t func)
: _sock(sock), _callback(func)
{
}
void operator()()
{
_callback(_sock);
}
private:
int _sock;
func_t _callback;
};
組件:日志修改
新函數(shù):vprintf()
頭文件:<stdarg.h>
首先使用函數(shù)前要搞明白可變參數(shù)列表
可變參數(shù)
提出問題:實現(xiàn)下面的寫法
logMessage(NORMAL, "create socket success: %d", _listensock);
為了至此這種可變參數(shù),需要幾個宏
va_list
參考文獻:va_list_百度百科
可變參數(shù)的提取邏輯
?
關于可變參數(shù)的提取模擬網(wǎng)上也大有代碼可以提供參考,這里就不再贅述了
vfprintf()的工作原理
?
????????vfprintf()會直接向文件中寫入,為了后序顯示打印結(jié)果,這里直接使用vsnfprintf(),工作原理大差不差
初始化一個va_list
?
日志準備
?
獲取時間小知識
獲取時間的方式有很多種比如:
?
日志初版
#pragma once
#include <iostream>
#include <string>
#include <stdarg.h>
#include <ctime>
#include <unistd.h>
// 定義五種不同的信息
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3 // 一種不影響服務器的錯誤
#define FATAL 4 // 致命錯誤
const char *to_levelstr(int level)
{
switch (level) // 這里直接return了
{
case DEBUG:
return "DEBUG";
case NORMAL:
return "NORMAL";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return nullptr;
}
}
// voidlogMessage(DEBUG, "hello %f, %d, %c", 3.14, 10, 'C');
void logMessage(int level, const char *format, ...)
{
// 格式如下
// [日志等級] [時間戳/時間] [pid] [message]
// [FATAL0] [2023-06-11 16:46:07] [123] [創(chuàng)建套接字失敗]
// 可變參數(shù)的提取邏輯
/* va_list start;
va_start(start);
while (*p)
{
switch (*p)
{
case '%':
p++;
if (*p == 'f')
arg = va_arg(start, float);
...
}
}
va_end(start); */
#define NUM 1024
// 獲取前綴信息[日志等級] [時間戳/時間] [pid]
char logprefix[NUM];
snprintf(logprefix, sizeof(logprefix), "[%s][%ld][pid: %d]",
to_levelstr(level), (long int)time(nullptr), getpid());
// 獲取內(nèi)容
char logcontent[NUM];
va_list arg;
va_start(arg, logcontent); // 因為壓棧是反過來的,所以直接使用左邊那個參數(shù)就行了
vsnprintf(logcontent, sizeof(logcontent), format, arg); // 第三個參數(shù)是格式, 第四個就是初始化好的可變參數(shù)
std::cout << logprefix << logcontent << std::endl;
}
日志啟動測試
這里就像一個日志的格式的樣子了,稍后將其寫入文件中
?
?
TCP通用服務器(守護進程)?* ?
????????這時候一個基本的TCP服務器已經(jīng)完成,但是有一個很糟糕的顯現(xiàn)會出現(xiàn),我這里是使用Xshell來進行遠程鏈接的,但是一旦關閉Xshell窗口的時候,服務器就自動退出了,顯然實際中的服務器是不能這樣的,于是我們需要引入守護進程(精靈進程)的概念,利用這個方法去解決這個問題,將它守護進程化
新指令1:jobs -- 查看進程作業(yè)
?
?
新指令2:fg --?foreground(前臺)
新指令3:bg(Ctrl + Z) -- background(后臺)
有且只有一個前臺任務!
?
創(chuàng)建新的會話
當Xshell退出的時候,會話窗口就會退出,那么任務就會被自動清理,所以需要創(chuàng)建新的會話
?
新接口2:daemon() -- 選用(本文不用)
Linux里面提供了創(chuàng)建守護進程的方式,但是下面就模擬實現(xiàn)一個daemon,來方便學習
?
為了完成守護進程我們滿足下面四點
#pragma once
#include <unistd.h>
void daemonSelf()
{
// 1. 讓調(diào)用進程忽略異常的信號 -- 否則一些進程碰到就直接掛了
// 2. 如何讓自己不是組長, setsid
// 3. 守護進程是脫離終端的,關閉或者重定向以前進程默認打開的文件
// 4. 可選:進程執(zhí)行路徑發(fā)生更改
}
?
新接口3:setsid() *
?
小細節(jié)1:null文件 -- 任何請求都可以被接收,但是都會被丟棄
????????守護進程是脫離終端的,關閉或者重定向以前進程默認打開的文件,因為 0 1 2 這幾個文件已經(jīng)被打開使用了,是不能用的,且不能關閉 0 1 2 這幾個文件 -- 一旦出現(xiàn)一個打印,就相當于向一個不存在的文件里面寫入了,立馬報錯,然后崩潰,所以我們就可以利用一個特殊的文件中寫入 -- null
接口1:open() -- 打開文件
?
返回值
open函數(shù)的返回值如果操作成功,它將返回一個文件描述符,如果操作失敗,它將返回-1。
參數(shù)含義:
1、pathname:
在open函數(shù)中第一個參數(shù)pathname是指向想要打開的文件路徑名,或者文件名。我們需要注意的是,這個路徑名是絕對路徑名。文件名則是在當前路徑下的。
2、flags:
flags參數(shù)表示打開文件所采用的操作,我們需要注意的是:必須指定以下三個常量的一種,且只允許指定一個
- O_RDONLY:只讀模式
- O_WRONLY:只寫模式
- O_RDWR:可讀可寫
參考文獻:linux open函數(shù)詳解_open 函數(shù)具體做了什么
接口2:dup2 -- 重定向
?
接口3:chdir() -- 進程執(zhí)行路徑更改(選填)
文件:daemon.hpp(守護進程)
#pragma once
#include <unistd.h>
#include <signal.h>
#include <cstdlib>
#include <cassert>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define DEV "/dev/null"
void daemonSelf(const char *currPath = nullptr)
{
// 1. 讓調(diào)用進程忽略異常的信號 -- 否則一些進程碰到就直接掛了
signal(SIGPIPE, SIG_IGN); // 意思是說,如果客戶端提前關閉了,服務器還在寫入就會導致錯誤寫入導致服務器崩掉
// 2. 如何讓自己不是組長, setsid
if (fork() > 0)
exit(0);
// 子進程 -- 守護進程(精靈進程):本質(zhì)就是孤兒進程的一種
pid_t n = setsid();
assert(n != -1);
// 3. 守護進程是脫離終端的,關閉或者重定向以前進程默認打開的文件
// 因為 0 1 2 這幾個文件已經(jīng)被打開使用了,是不能用的
// 且不能關閉 0 1 2 這幾個文件 -- 一旦出現(xiàn)一個打印,就相當于向一個不存在的文件里面寫入了,立馬報錯,然后崩潰
// 所以我們就可以利用一個特殊的文件中寫入 -- null
// 這樣重定向到nuill中,這樣進程的寫入和讀取就不會報錯了
int fd = open(DEV, O_RDWR);
if(fd >= 0)
{
dup2(fd, 0); // 本來從 0 1 2 中讀,現(xiàn)在從fd中讀,也就是dev/null 中去讀
dup2(fd, 1);
dup2(fd, 2);
close(fd);
}
else
{
close(0);
close(1);
close(2);
}
// 4. 可選:進程執(zhí)行路徑發(fā)生更改
if(currPath) chdir(currPath); // 如果currPath被設置,這里就進行更改
}
測試:守護進程
?
測試服務器的正?;仫@
日志修改:日志寫入文件中保存
文章來源:http://www.zghlxwxcb.cn/news/detail-488434.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-488434.html
全部代碼文件
daemon.hpp -- 守護進程
#pragma once
#include <unistd.h>
#include <signal.h>
#include <cstdlib>
#include <cassert>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define DEV "/dev/null"
void daemonSelf(const char *currPath = nullptr)
{
// 1. 讓調(diào)用進程忽略異常的信號 -- 否則一些進程碰到就直接掛了
signal(SIGPIPE, SIG_IGN); // 意思是說,如果客戶端提前關閉了,服務器還在寫入就會導致錯誤寫入導致服務器崩掉
// 2. 如何讓自己不是組長, setsid
if (fork() > 0)
exit(0);
// 子進程 -- 守護進程(精靈進程):本質(zhì)就是孤兒進程的一種
pid_t n = setsid(); // 走到這里,就已經(jīng)是一個 畫手進程,組長進程了,這時基于第二步產(chǎn)生的結(jié)果
assert(n != -1);
// 3. 守護進程是脫離終端的,關閉或者重定向以前進程默認打開的文件
// 因為 0 1 2 這幾個文件已經(jīng)被打開使用了,是不能用的
// 且不能關閉 0 1 2 這幾個文件 -- 一旦出現(xiàn)一個打印,就相當于向一個不存在的文件里面寫入了,立馬報錯,然后崩潰
// 所以我們就可以利用一個特殊的文件中寫入 -- null
// 這樣重定向到nuill中,這樣進程的寫入和讀取就不會報錯了
int fd = open(DEV, O_RDWR);
if(fd >= 0)
{
dup2(fd, 0); // 本來從 0 1 2 中讀,現(xiàn)在從fd中讀,也就是dev/null 中去讀
dup2(fd, 1);
dup2(fd, 2);
close(fd);
}
else
{
close(0);
close(1);
close(2);
}
// 4. 可選:進程執(zhí)行路徑發(fā)生更改
if(currPath) chdir(currPath); // 如果currPath被設置,這里就進行更改
}
log.hpp -- 日志文件
#pragma once
#include <iostream>
#include <string>
#include <stdarg.h>
#include <ctime>
#include <unistd.h>
#define LOG_NORMAL "log.txt" // 前三個放入這里,后兩個信息放入下面的文件中去
#define LOG_ERR "log.error"
// 定義五種不同的信息
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3 // 一種不影響服務器的錯誤
#define FATAL 4 // 致命錯誤
const char *to_levelstr(int level)
{
switch (level) // 這里直接return了
{
case DEBUG:
return "DEBUG";
case NORMAL:
return "NORMAL";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return nullptr;
}
}
// voidlogMessage(DEBUG, "hello %f, %d, %c", 3.14, 10, 'C');
void logMessage(int level, const char *format, ...)
{
// 格式如下
// [日志等級] [時間戳/時間] [pid] [message]
// [FATAL0] [2023-06-11 16:46:07] [123] [創(chuàng)建套接字失敗]
// 可變參數(shù)的提取邏輯
/* va_list start;
va_start(start);
while (*p)
{
switch (*p)
{
case '%':
p++;
if (*p == 'f')
arg = va_arg(start, float);
...
}
}
va_end(start); */
#define NUM 1024
// 獲取前綴信息[日志等級] [時間戳/時間] [pid]
char logprefix[NUM];
snprintf(logprefix, sizeof(logprefix), "[%s][%ld][pid: %d]",
to_levelstr(level), (long int)time(nullptr), getpid());
// 獲取內(nèi)容
char logcontent[NUM];
va_list arg;
va_start(arg, format); // 因為壓棧是反過來的,所以直接使用左邊那個參數(shù)就行了
vsnprintf(logcontent, sizeof(logcontent), format, arg); // 第三個參數(shù)是格式, 第四個就是初始化好的可變參數(shù)
// std::cout << logprefix << logcontent << std::endl;
FILE *log = fopen(LOG_NORMAL, "a"); // 追加式寫入
FILE *err = fopen(LOG_ERR, "a");
if(log != nullptr && err != nullptr)
{
FILE *curr = nullptr;
if(level == DEBUG || level == NORMAL || level == WARNING) curr = log;
if(level == ERROR || level == FATAL) curr = err;
if(curr) fprintf(curr, "%s%s\n", logprefix, logcontent);
fclose(log);
fclose(err);
}
}
makefile
cc=g++
.PHONY:all
all:tcpserver tcpclient
tcpclient:tcpClient.cc
$(cc) -o $@ $^ -std=c++11
tcpserver:tcpServer.cc
$(cc) -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f tcpserver tcpclient
tcpClient.cc -- 客戶端1
#include "tcpClient.hpp"
#include <memory>
using namespace std;
static void Usage(string proc)
{
cout << "Usage:\n\t" << proc << " serverip serverport\n\n"; // 命令提示符
}
// ./tcpclient serverip serverport 調(diào)用邏輯
int main(int argc, char *argv[])
{
if(argc != 3)
{
Usage(argv[0]);
exit(1);
}
string serverip = argv[1];
uint16_t serverport = atoi(argv[2]);
unique_ptr<TcpClient> tcli(new TcpClient(serverip, serverport));
tcli->initClient();
tcli->start();
return 0;
}
tcpClient.hpp -- 客戶端2
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#define NUM 1024
class TcpClient
{
public:
TcpClient(const std::string &serverip, const uint16_t &port)
: _sock(1), _serverip(serverip), _serverport(port)
{
}
void initClient()
{
// 1. 創(chuàng)建socket
_sock = socket(AF_INET, SOCK_STREAM, 0);
if (_sock < 0)
{
// 客戶端也可以有日志,不過這里就不再實現(xiàn)了,直接打印錯誤
std::cout << "socket create error" << std::endl;
exit(2);
}
// 2. tcp的客戶端要不要bind? 要的! 但是不需要顯示bind,這里的client port要讓OS自定!
// 3. 要不要listen? -- 不需要!客戶端不需要建立鏈接
// 4. 要不要accept? -- 不要!
// 5. 要什么? 要發(fā)起鏈接!
}
void start()
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(_serverport);
server.sin_addr.s_addr = inet_addr(_serverip.c_str());
if (connect(_sock, (struct sockaddr *)&server, sizeof(server)) != 0)
{
std::cerr << "socket connect error" << std::endl;
}
else
{
std::string msg;
while (true)
{
std::cout << "Enter# ";
std::getline(std::cin, msg);
write(_sock, msg.c_str(), msg.size());
char buffer[NUM];
int n = read(_sock, buffer, sizeof(buffer) - 1);
if (n > 0)
{
// 目前我們把讀到的數(shù)據(jù)當成字符串, 截至目前
buffer[n] = 0;
std::cout << "Server回顯# " << buffer << std::endl;
}
else
{
break;
}
}
}
}
~TcpClient()
{
if(_sock >= 0) close(_sock); //不寫也行,因為文件描述符的生命周期隨進程,所以進程退了,自然也就會自動回收了
}
private:
int _sock;
std::string _serverip;
uint16_t _serverport;
};
tcpServer.cc -- 服務端1
#include "tcpServer.hpp"
#include "daemon.hpp"
#include <memory>
using namespace server;
using namespace std;
static void Usage(string proc)
{
cout << "Usage:\n\t" << proc << " local_port\n\n"; // 命令提示符
}
// tcp服務器,啟動上和udp server一模一樣
// ./tcpserver local_port
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
// 測試 守護進程
uint16_t port = atoi(argv[1]);
unique_ptr<TcpServer> tsvr(new TcpServer(port));
tsvr->initServer();
daemonSelf();
tsvr->start();
return 0;
}
tcpServer.hpp -- 服務端2
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#include <pthread.h>
#include "log.hpp"
#include "ThreadPool.hpp"
#include "Task.hpp"
namespace server
{
enum
{
USAGE_ERR = 1,
SOCKET_ERR,
BIND_ERR,
LISTEN_ERR
};
static const uint16_t gport = 8080;
static const int gbacklog = 5; // 10、20、50都可以,但是不要太大比如5千,5萬
class TcpServer; // 聲明
// 用以線程傳參
class ThreadData
{
public:
ThreadData(TcpServer *self, int sock) : _self(self), _sock(sock)
{
}
public:
TcpServer *_self;
int _sock;
};
class TcpServer
{
public:
TcpServer(const uint16_t &port = gport) : _listensock(-1), _port(port)
{
}
void initServer()
{
// 1. 創(chuàng)建socket文件套接字對象 -- 流式套接字
_listensock = socket(AF_INET, SOCK_STREAM, 0); // 第三個參數(shù)默認 0
if (_listensock < 0)
{
logMessage(FATAL, "create socket error");
exit(SOCKET_ERR);
}
logMessage(NORMAL, "create socket success: %d", _listensock);
// 2.bind綁定自己的網(wǎng)路信息 -- 注意包含頭文件
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_port); // 這里有個細節(jié),我們會發(fā)現(xiàn)當我們接受數(shù)據(jù)的時候是不需要主機轉(zhuǎn)網(wǎng)路序列的,因為關于IO類的接口,內(nèi)部都幫我們實現(xiàn)了這一功能,這里不幫我們做是因為我們傳入的是一個結(jié)構體,系統(tǒng)做不到
local.sin_addr.s_addr = INADDR_ANY; // 接受任意ip地址
if (bind(_listensock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
logMessage(FATAL, "bind socket error");
exit(BIND_ERR);
}
logMessage(NORMAL, "bind socket success");
// 3. 設置socket 為監(jiān)聽狀態(tài) -- TCP與UDP不同,它先要建立鏈接之后,TCP是面向鏈接的,后面還會有“握手”過程
if (listen(_listensock, gbacklog) < 0) // 第二個參數(shù)backlog后面再填這個坑
{
logMessage(FATAL, "listen socket error");
exit(LISTEN_ERR);
}
logMessage(NORMAL, "listen socket success");
}
void start()
{
// version 4. 線程池初始化
ThreadPool<Task>::getInstance()->run(); // 讓它跑起來
logMessage(NORMAL, "Thread init success");
for (;;) // 一個死循環(huán)
{
// 4. server 獲取新鏈接
// sock 和client 進行通信的fd
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sock = accept(_listensock, (struct sockaddr *)&peer, &len);
if (sock < 0)
{
logMessage(ERROR, "accept error, next"); // 這個不影響服務器的運行,用ERROR,就像張三不會因為沒有把人招呼進來就不干了
continue;
}
logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // 因為支持可變參數(shù)了
// 日志測試
logMessage(DEBUG, "accept error, next");
logMessage(WARNING, "accept error, next");
logMessage(FATAL, "accept error, next");
logMessage(NORMAL, "accept error, next");
logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?
logMessage(DEBUG, "accept error, next");
logMessage(WARNING, "accept error, next");
logMessage(FATAL, "accept error, next");
logMessage(NORMAL, "accept error, next");
logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?
logMessage(DEBUG, "accept error, next");
logMessage(WARNING, "accept error, next");
logMessage(FATAL, "accept error, next");
logMessage(NORMAL, "accept error, next");
// version 4 線程池 -- 默認啟動10個
ThreadPool<Task>::getInstance()->push(Task(sock, serviceIO));
}
}
~TcpServer() {}
private:
int _listensock; // 修改二:改為listensock 不是用來進行數(shù)據(jù)通信的,它是用來監(jiān)聽鏈接到來,獲取新鏈接的!
uint16_t _port;
};
} // namespace server
Task.hpp --?形成任務?
?
#pragma once
#include <iostream>
#include <string>
#include <cstdio>
#include <functional>
// 因為這里的服務和類里面是沒有任何關系的,所以代碼可以直接拿出去
void serviceIO(int sock)
{
char buffer[1024];
while (true) // 其實這里的任務是不適合線程池處理的,一個任務來了就會拿走一個線程,所以線程池處理的應該是一個很快就可以結(jié)束的服務
{
ssize_t n = read(sock, buffer, sizeof(buffer) - 1);
if (n > 0)
{
// 目前我們把讀到的數(shù)據(jù)當成字符串, 截止目前
buffer[n] = 0;
std::cout << "recv message: " << buffer << std::endl;
std::string outbuffer = buffer;
outbuffer += " server[echo]";
write(sock, outbuffer.c_str(), outbuffer.size()); // 多路轉(zhuǎn)接
}
else if (n == 0)
{
// 代表client退出
logMessage(NORMAL, "client quit, me too!");
break;
}
}
close(sock); // 在內(nèi)部關閉就行了
}
class Task
{
using func_t = std::function<void(int)>;
public:
Task()
{
}
Task(int sock, func_t func)
: _sock(sock), _callback(func)
{
}
void operator()()
{
_callback(_sock);
}
private:
int _sock;
func_t _callback;
};
?
Thread.hpp -- 線程池1
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <functional>
#include <pthread.h>
namespace ThreadNs
{
typedef std::function<void *(void *)> func_t;
const int num = 1024;
class Thread
{
private:
// 在類內(nèi)創(chuàng)建線程,想讓線程執(zhí)行對應的方法,需要將方法設置成為static
static void *start_routine(void *args) // 類內(nèi)成員,有缺省參數(shù)!
{
Thread *_this = static_cast<Thread *>(args);
return _this->callback();
}
public:
Thread()
{
char namebuffer[num];
snprintf(namebuffer, sizeof namebuffer, "thread-%d", threadnum++);
name_ = namebuffer;
}
void start(func_t func, void *args = nullptr)
{
func_ = func;
args_ = args;
int n = pthread_create(&tid_, nullptr, start_routine, this); // TODO
assert(n == 0);
(void)n;
}
void join()
{
int n = pthread_join(tid_, nullptr);
assert(n == 0);
(void)n;
}
std::string threadname()
{
return name_;
}
~Thread()
{
// do nothing
}
void *callback() { return func_(args_);}
private:
std::string name_;
func_t func_;
void *args_;
pthread_t tid_;
static int threadnum;
};
int Thread::threadnum = 1;
} // end namespace ThreadNs
ThreadPool.hpp -- 線程池2
#pragma once
#include "Thread.hpp"
#include "LockGuard.hpp"
#include "log.hpp"
#include <vector>
#include <queue>
#include <mutex>
#include <pthread.h>
#include <unistd.h>
using namespace ThreadNs;
const int gnum = 10;
template <class T>
class ThreadPool;
template <class T>
class ThreadData
{
public:
ThreadPool<T> *threadpool;
std::string name;
public:
ThreadData(ThreadPool<T> *tp, const std::string &n) : threadpool(tp), name(n)
{
}
};
template <class T>
class ThreadPool
{
private:
static void *handlerTask(void *args)
{
ThreadData<T> *td = (ThreadData<T> *)args;
while (true)
{
T t;
{
LockGuard lockguard(td->threadpool->mutex());
while (td->threadpool->isQueueEmpty())
{
td->threadpool->threadWait();
}
t = td->threadpool->pop(); // pop的本質(zhì),是將任務從公共隊列中,拿到當前線程自己獨立的棧中
}
t();
}
delete td;
return nullptr;
}
ThreadPool(const int &num = gnum) : _num(num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
for (int i = 0; i < _num; i++)
{
_threads.push_back(new Thread());
}
}
void operator=(const ThreadPool &) = delete;
ThreadPool(const ThreadPool &) = delete;
public:
void lockQueue() { pthread_mutex_lock(&_mutex); }
void unlockQueue() { pthread_mutex_unlock(&_mutex); }
bool isQueueEmpty() { return _task_queue.empty(); }
void threadWait() { pthread_cond_wait(&_cond, &_mutex); }
T pop()
{
T t = _task_queue.front();
_task_queue.pop();
return t;
}
pthread_mutex_t *mutex()
{
return &_mutex;
}
public:
void run()
{
for (const auto &t : _threads)
{
ThreadData<T> *td = new ThreadData<T>(this, t->threadname());
t->start(handlerTask, td);
//std::cout << t->threadname() << " start ..." << std::endl;
logMessage(DEBUG, "%s start ...", t->threadname().c_str());
}
}
void push(const T &in)
{
LockGuard lockguard(&_mutex);
_task_queue.push(in);
pthread_cond_signal(&_cond);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
for (const auto &t : _threads)
delete t;
}
static ThreadPool<T> *getInstance()
{
if (nullptr == tp)
{
_singlock.lock();
if (nullptr == tp)
{
tp = new ThreadPool<T>();
}
_singlock.unlock();
}
return tp;
}
private:
int _num;
std::vector<Thread *> _threads;
std::queue<T> _task_queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T> *tp;
static std::mutex _singlock;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::tp = nullptr;
template <class T>
std::mutex ThreadPool<T>::_singlock;
LockGuard.hpp -- 線程池(加鎖部分)
?
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *lock_p = nullptr): lock_p_(lock_p)
{}
void lock()
{
if(lock_p_) pthread_mutex_lock(lock_p_);
}
void unlock()
{
if(lock_p_) pthread_mutex_unlock(lock_p_);
}
~Mutex()
{}
private:
pthread_mutex_t *lock_p_;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *mutex): mutex_(mutex)
{
mutex_.lock(); //在構造函數(shù)中進行加鎖
}
~LockGuard()
{
mutex_.unlock(); //在析構函數(shù)中進行解鎖
}
private:
Mutex mutex_;
};
?
到了這里,關于簡單的TCP網(wǎng)絡程序·線程池(后端服務器)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!