目錄
一、線程池模塊
Thread.h
LockGuard.h
ThreadPool.h
二、任務(wù)模塊模塊
Task.h
三、日志模塊
Log.h
四、守護(hù)進(jìn)程模塊
Deamon.h
?五、TCP通信模塊
Server.h
Client.h
server.cpp
client.cpp
關(guān)于TCP通信協(xié)議的封裝,此篇博客有詳述:
【Linux后端服務(wù)器開發(fā)】TCP通信設(shè)計_命運on-9的博客-CSDN博客
線程池的設(shè)計,包含線程的封裝、互斥鎖的封裝、線程池的封裝
TCP通信的設(shè)計包含服務(wù)器的封裝、客戶端的封裝
我們將任務(wù)代碼和服務(wù)器解耦,需要再單獨設(shè)計Task任務(wù)模塊
為了模擬服務(wù)器設(shè)計的完整性,我們需要再設(shè)計一個日志模塊
在很多情況下,服務(wù)器都是一個后臺進(jìn)程(守護(hù)進(jìn)程),我們需要再設(shè)計一個守護(hù)進(jìn)程模塊
一、線程池模塊
線程池設(shè)計為單例模式,線程池容量根據(jù)系統(tǒng)CPU的核數(shù)決定。
互斥鎖設(shè)計為只能指針模式,增加安全性,避免出現(xiàn)死鎖情況。
線程在調(diào)用函數(shù)的時候,函數(shù)不能是類內(nèi)函數(shù),所以即使需要調(diào)用的函數(shù)聲明在類內(nèi),也需要將函數(shù)設(shè)置為靜態(tài)。
Thread.h
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <functional>
#include <pthread.h>
class Thread
{
using func_t = std::function<void*(void*)>;
public:
Thread()
{
char buf[1024];
snprintf(buf, sizeof(buf), "thread-%d", s_thread_num++);
_name = buf;
}
void Start(func_t func, void* args = nullptr)
{
_func = func;
_args = args;
pthread_create(&_tid, nullptr, Start_Routine, this);
}
void Join()
{
int n = pthread_join(_tid, nullptr);
assert(n == 0);
}
std::string Thread_Name()
{
return _name;
}
private:
static void* Start_Routine(void* args)
{
Thread* tmp = static_cast<Thread*>(args);
return tmp->Call_Back();
}
void* Call_Back()
{
_func(_args);
}
private:
std::string _name;
func_t _func;
void* _args;
pthread_t _tid;
static int s_thread_num;
};
int Thread::s_thread_num = 1;
LockGuard.h
# pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t* plock)
: _plock(plock)
{}
void lock()
{
if (_plock)
pthread_mutex_lock(_plock);
}
void unlock()
{
if (_plock)
pthread_mutex_unlock(_plock);
}
private:
pthread_mutex_t* _plock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t* plock)
: _mutex(plock)
{
_mutex.lock();
}
~LockGuard()
{
_mutex.unlock();
}
private:
Mutex _mutex;
};
ThreadPool.h
#pragma once
#include "Thread.h"
#include "LockGuard.h"
#include <vector>
#include <queue>
#include <mutex>
#include <unistd.h>
using namespace std;
const int g_num = 8;
template<class T>
class ThreadPool;
template<class T>
class ThreadData
{
public:
ThreadPool<T>* _tp;
std::string _name;
ThreadData(ThreadPool<T>* tp, const std::string& name)
: _tp(tp), _name(name)
{}
};
template<class T>
class ThreadPool
{
public:
ThreadPool(const int& num = g_num)
: _num(num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
for (int i = 0; i < num; ++i)
_threads.push_back(new Thread());
}
// 單例模式
static ThreadPool<T>* Get_Instance()
{
if (tp == nullptr)
{
sing_lock.lock();
tp = new ThreadPool<T>();
sing_lock.unlock();
}
return tp;
}
// 禁用拷貝與賦值
ThreadPool(const ThreadPool<T>&) = delete;
ThreadPool<T>& operator=(const ThreadPool<T>&) = delete;
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
for (auto& t : _threads)
delete t;
}
public:
void Push(const T& in)
{
LockGuard lock(&_mutex);
_tasks.push(in);
pthread_cond_signal(&_cond);
}
T Pop()
{
T t = _tasks.front();
_tasks.pop();
return t;
}
void Run()
{
for (const auto& t : _threads)
{
ThreadData<T>* td = new ThreadData<T>(this, t->Thread_Name());
t->Start(Handler_Task, td);
std::cout << t->Thread_Name() << " start ..." << endl;
}
}
private:
static void* Handler_Task(void* args)
{
ThreadData<T>* td = static_cast<ThreadData<T>*>(args);
while (true)
{
T t;
{
LockGuard lock(td->_tp->Mutex());
while (td->_tp->Tasks_Empty())
{
td->_tp->Thread_Wait();
}
t = td->_tp->Pop();
}
t(); // 執(zhí)行任務(wù)
}
delete td;
return nullptr;
}
private:
pthread_mutex_t* Mutex()
{
return &_mutex;
}
void Lock_Queue()
{
pthread_mutex_lock(&_mutex);
}
void Unlock_Queue()
{
pthread_mutex_unlock(&_mutex);
}
void Thread_Wait()
{
pthread_cond_wait(&_cond, &_mutex);
}
bool Tasks_Empty()
{
return _tasks.empty();
}
private:
int _num;
vector<Thread*> _threads;
queue<T> _tasks;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T>* tp;
static mutex sing_lock;
};
template<class T>
ThreadPool<T>* ThreadPool<T>::tp = nullptr;
template<class T>
mutex ThreadPool<T>::sing_lock;
二、任務(wù)模塊模塊
將任務(wù)封裝成類對象仿函數(shù),執(zhí)行任務(wù)就是對類做()的重定義,當(dāng)然當(dāng)我們需要運行多個任務(wù)時,可以再次Task類對象和實際要運行的任務(wù)進(jìn)行解耦,有類對象調(diào)用其他的任務(wù)函數(shù)。
Task.h
#pragma once
#include <iostream>
#include <unistd.h>
using namespace std;
class Task
{
public:
Task()
{}
Task(int client_sock, string client_ip, uint16_t client_port)
: _client_sock(client_sock), _client_ip(client_ip), _client_port(client_port)
{}
void operator()()
{
while (true)
{
// 5.1 接收信息
char recv_buf[1024];
int n = read(_client_sock, recv_buf, sizeof(recv_buf));
if (n == 0)
return;
recv_buf[n] = 0;
cout << "[" << _client_ip << ":" << _client_port << "]# " << recv_buf << endl;
// 5.2 應(yīng)答信息
char sent_buf[1024];
snprintf(sent_buf, sizeof(sent_buf), "服務(wù)器已收到信息: %s\n", recv_buf);
write(_client_sock, sent_buf, sizeof(sent_buf));
}
}
private:
int _client_sock;
string _client_ip;
uint16_t _client_port;
};
三、日志模塊
日志模塊需要我們記錄服務(wù)器的正常連接與異常連接,分別用兩個文件記錄。
日志內(nèi)容需要顯示連接的狀態(tài)、時間、進(jìn)程號,當(dāng)服務(wù)器與客戶端連接成功的時候,日志還要記錄客戶端的IP和端口號。
Log.h
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
#include <memory>
#define LOG_NORMAL "log.txt"
#define LOG_ERROR "err.txt"
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
#define NUM 1024
const char* To_Lever_Str(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
case NORMAL:
return "NORMAL";
case WARNING:
return "warning";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return nullptr;
}
}
void To_Time_Str(long int t, std::string& cur_time)
{
// 將時間戳轉(zhuǎn)換成tm結(jié)構(gòu)體
struct tm* cur;
cur = gmtime(&t);
cur->tm_hour = (cur->tm_hour + 8) % 24; //東八區(qū)
char tmp[NUM];
std::string my_format = "%Y-%m-%d %H:%M:%S";
strftime(tmp, sizeof(tmp), my_format.c_str(), cur);
cur_time = tmp;
}
// ... 可變參數(shù)列表
void Log_Message(int level, const char* format, const char* ip = "", const char* port = "")
{
// [日志等級][時間][pid][message]
char log_prefix[NUM];
std::string cur_time;
To_Time_Str(time(nullptr), cur_time);
snprintf(log_prefix, sizeof(log_prefix), "[%s][%s][pid:%d]",
To_Lever_Str(level), cur_time.c_str(), getpid());
std::string log_content = "";
if (strcmp(ip, "") != 0 || strcmp(port, "") != 0)
{
log_content += "[";
log_content += ip;
log_content += ": ";
log_content += port;
log_content += "]";
}
log_content += format;
FILE* log = fopen(LOG_NORMAL, "a"); // 連接記錄日志
FILE* err = fopen(LOG_ERROR, "a"); // 報錯日志
if (log != nullptr && err != nullptr)
{
FILE* curr = nullptr;
if (level == DEBUG || level == NORMAL || level == WARNING)
curr = log;
else
curr = err;
if (curr)
fprintf(curr, "%s%s\n", log_prefix, log_content.c_str());
fclose(log);
fclose(err);
}
}
四、守護(hù)進(jìn)程模塊
守護(hù)進(jìn)程模塊我們需要將服務(wù)器后臺化,讓進(jìn)程忽略掉異常的信號(服務(wù)器不能輕易掛掉,故需要屏蔽信號)。
守護(hù)進(jìn)程的本質(zhì)就是孤兒進(jìn)程,我們需要在服務(wù)器執(zhí)行的時候,分離出子進(jìn)程,殺死主進(jìn)程。
守護(hù)進(jìn)程脫離終端,關(guān)閉或重定向以前進(jìn)程默認(rèn)打開的文件描述符。
Deamon.h
#pragma once
#include <unistd.h>
#include <signal.h>
#include <cstdlib>
#include <cassert>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define DEV "/dev/null"
void Deamon_Self(const char* curr_path = nullptr)
{
// 1. 讓進(jìn)程忽略掉異常的信號
signal(SIGPIPE, SIG_IGN);
// 2. 守護(hù)進(jìn)程,本質(zhì)就是孤兒進(jìn)程
if (fork() > 0)
exit(1);
pid_t n = setsid();
assert(n != -1);
// 3. 守護(hù)進(jìn)程脫離終端,關(guān)閉或重定向以前進(jìn)程默認(rèn)打開的文件
int fd = open(DEV, O_RDWR);
if (fd > 0)
{
dup2(fd, 0);
dup2(fd, 1);
dup2(fd, 2);
close(fd);
}
else
{
close(0);
close(1);
close(2);
}
}
?五、TCP通信模塊
在上一篇博客的TCP通信協(xié)議的封裝中,沒有考慮TIME_WAIT引起bind失敗的問題(這個問題其實很少見,可以不管),此處為了通信協(xié)議更加完善,我們將其加上。
在socket套接字創(chuàng)建之后,使用socketopt()設(shè)置socket描述符的選項SO_REUSEADDR為1,表示允許創(chuàng)建端口號相同但是IP地址不同的多個socket描述符。文章來源:http://www.zghlxwxcb.cn/news/detail-602818.html
int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSERADDR, &opt, sizeof(opt));
這樣設(shè)置之后,當(dāng)服務(wù)器關(guān)閉之后,可以立刻重啟。文章來源地址http://www.zghlxwxcb.cn/news/detail-602818.html
Server.h
#pragma once
#include <iostream>
#include <unistd.h>
#include <string>
#include <cstring>
#include <functional>
#include <cerrno>
#include <thread>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Task.h"
#include "Log.h"
#include "ThreadPool.h"
using namespace std;
const string g_default_ip = "0.0.0.0";
enum
{
USAGE_ERR = 1,
SOCK_ERR,
BIND_ERR,
LISTEN_ERR,
ACCEPT_ERR
};
class TcpServer
{
public:
TcpServer(const uint16_t port, const string& ip = g_default_ip)
: _port(port), _ip(ip), _listenfd(0)
{}
void Init()
{
// 1. 創(chuàng)建socket套接字
_listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (_listenfd < 0)
{
Log_Message(FATAL, "socket error");
exit(SOCK_ERR);
}
Log_Message(NORMAL, "socekt create success");
// 1.1 解決TIME_WAIT狀態(tài)bind失敗問題
int opt = 1;
setsockopt(_listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
// 2. bind綁定服務(wù)器網(wǎng)絡(luò)信息
struct sockaddr_in local;
local.sin_family = AF_INET;
local.sin_addr.s_addr = htonl(INADDR_ANY);
local.sin_port = htons(_port);
if (bind(_listenfd, (struct sockaddr*)&local, sizeof(local)) < 0)
{
Log_Message(FATAL, "bind error");
exit(BIND_ERR);
}
Log_Message(NORMAL, "bind success");
// 3. listen設(shè)置監(jiān)聽
if (listen(_listenfd, 8) < 0)
{
// 監(jiān)聽的連接隊列長度與項目的線程數(shù)相關(guān)
Log_Message(FATAL, "listen error");
exit(LISTEN_ERR);
}
Log_Message(NORMAL, "listen success");
}
void Start()
{
ThreadPool<Task>::Get_Instance()->Run();
while (true)
{
// 4. accept連接客戶端
struct sockaddr_in client;
socklen_t client_len = sizeof(client);
int client_sock = accept(_listenfd, (struct sockaddr*)&client, &client_len);
if (client_sock < 0)
{
Log_Message(FATAL, "accept error");
exit(ACCEPT_ERR);
}
string client_ip = inet_ntoa(client.sin_addr);
uint16_t client_port = ntohs(client.sin_port);
Log_Message(NORMAL, "accept success", client_ip.c_str(), to_string(client_port).c_str());
// 5. 連接成功,進(jìn)行通信, 多線程
ThreadPool<Task>::Get_Instance()->Push(Task(client_sock, client_ip, client_port)); // 線程分離
}
}
private:
string _ip;
uint16_t _port;
int _listenfd;
};
Client.h
#pragma once
#include <iostream>
#include <unistd.h>
#include <string>
#include <cstring>
#include <functional>
#include <cerrno>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Log.h"
using namespace std;
class TcpClient
{
public:
TcpClient(const uint16_t server_port, const string server_ip)
: _server_port(server_port), _server_ip(server_ip), _sock(-1)
{}
void Init()
{
// 1. 創(chuàng)建套接字
_sock = socket(AF_INET, SOCK_STREAM, 0);
if (_sock < 0)
{
cerr << "socket error " << errno << ": " << strerror(errno) << endl;
exit(1);
}
// 2. bind綁定,由OS綁定
}
void Run()
{
// 3. 向服務(wù)器發(fā)起連接請求
struct sockaddr_in server;
server.sin_family = AF_INET;
server.sin_addr.s_addr = inet_addr(_server_ip.c_str());
server.sin_port = htons(_server_port);
if (connect(_sock, (struct sockaddr*)&server, sizeof(server)) != 0)
{
cerr << "connect error " << errno << ": " << strerror(errno) << endl;
exit(1);
}
// 4. 連接成功,進(jìn)行通信
while (true)
{
// 4.1 發(fā)送信息
char sent_buf[1024];
cout << "請輸入信息:";
gets(sent_buf);
write(_sock, sent_buf, sizeof(sent_buf));
// 4.2 接收應(yīng)答信息
char recv_buf[1024];
int n = read(_sock, recv_buf, sizeof(recv_buf));
if (n > 0)
recv_buf[n] = 0;
cout << recv_buf << endl;
}
}
private:
string _server_ip;
uint16_t _server_port;
int _sock;
};
server.cpp
#include "Server.h"
#include "Deamon.h"
#include <memory>
void Usage()
{
cout << "Usage:\n\tserver port" << endl;
exit(USAGE_ERR);
}
int main(int args, char* argv[])
{
if (args != 2)
{
Log_Message(FATAL, "usage error");
Usage();
}
uint16_t port = atoi(argv[1]);
unique_ptr<TcpServer> tcp_server(new TcpServer(port));
Deamon_Self();
tcp_server->Init();
tcp_server->Start();
return 0;
}
client.cpp
#include "Client.h"
#include <memory>
void Usage()
{
cout << "Usage:\n\tclient ip port" << endl;
exit(1);
}
int main(int args, char* argv[])
{
if (args != 3)
{
Log_Message(FATAL, "usage error");
Usage();
}
string server_ip = argv[1];
uint16_t server_port = atoi(argv[2]);
unique_ptr<TcpClient> tcp_client(new TcpClient(server_port, server_ip));
tcp_client->Init();
tcp_client->Run();
return 0;
}
到了這里,關(guān)于【Linux后端服務(wù)器開發(fā)】封裝線程池實現(xiàn)TCP多線程通信的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!