實現(xiàn)目標(biāo)
利用線程池多線程并發(fā)實現(xiàn)基于TCP通信的多個客戶端與服務(wù)端之間的交互,客戶端發(fā)送數(shù)據(jù),服務(wù)端接收后處理數(shù)據(jù)并返回。服務(wù)端為守護進程
實現(xiàn)步驟
- 封裝一個記錄日志的類,將程序運行的信息保存到文件
- 封裝線程類、服務(wù)端處理任務(wù)類以及將鎖進行封裝,為方便實現(xiàn)線程池
- 實現(xiàn)服務(wù)端,使服務(wù)端能接收客戶端所發(fā)來的數(shù)據(jù),處理數(shù)據(jù)后返回。服務(wù)端采用多線程并發(fā)處理
- 封裝守護進程方法,使服務(wù)端為守護進程
- 實現(xiàn)客戶端,可以向服務(wù)端發(fā)送數(shù)據(jù),并接收到服務(wù)端發(fā)送回來的數(shù)據(jù)
封裝日志類
將程序運行的信息保存到指定文件,例如創(chuàng)建套接字成功或者失敗等信息。以【狀態(tài)】【時間】【信息】的格式保存。
狀態(tài)可分為五種:“DEBUG”,“NORMAL”,“WARNING”,“ERROR”,“FATAL”
日志類保存的信息需帶有可變參數(shù)
#pragma once
#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
using namespace std;
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *to_levelstr(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
case NORMAL:
return "NORMAL";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return nullptr;
}
}
void LogMessage(int level, const char *format, ...)
{
#define NUM 1024
char logpre[NUM];
snprintf(logpre, sizeof(logpre), "[%s][%ld][%d]", to_levelstr(level), (long int)time(nullptr), getpid());
char line[NUM];
// 可變參數(shù)
va_list arg;
va_start(arg, format);
vsnprintf(line, sizeof(line), format, arg);
// 保存至文件
FILE* log = fopen("log.txt", "a");
FILE* err = fopen("log.error", "a");
if(log && err)
{
FILE *curr = nullptr;
if(level == DEBUG || level == NORMAL || level == WARNING)
curr = log;
if(level == ERROR || level == FATAL)
curr = err;
if(curr) fprintf(curr, "%s%s\n", logpre, line);
fclose(log);
fclose(err);
}
}
封裝線程池
封裝線程
將線程的創(chuàng)建,等待封裝成類的成員函數(shù)。不再需要單個的條用線程庫接口,以對象的方式創(chuàng)建。
需要注意:在類里面的線程回調(diào)方法必須設(shè)為static類型,而靜態(tài)的方法是不能訪問類內(nèi)成員的,因此傳給回調(diào)函數(shù)的參數(shù)需要將整個對象傳過去,通過對象來獲取類內(nèi)成員
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <functional>
#include <pthread.h>
typedef std::function<void *(void *)> func_t;
class Thread
{
private:
// 在類內(nèi)創(chuàng)建線程,想讓線程執(zhí)行對應(yīng)的方法,需要將方法設(shè)置成為static
static void *start_routine(void *args) // 類內(nèi)成員,有缺省參數(shù)!
{
Thread *_this = static_cast<Thread *>(args);
return _this->callback();
}
public:
// 構(gòu)造函數(shù)里直接生成線程名,利用靜態(tài)變量從1開始
Thread()
{
char namebuffer[1024];
snprintf(namebuffer, sizeof namebuffer, "thread-NO.%d", threadnum++);
_name = namebuffer;
}
// 線程啟動
void start(func_t func, void *args = nullptr)
{
_func = func;
_args = args;
// 由于靜態(tài)的方法是不能訪問類內(nèi)成員的,
// 因此傳給回調(diào)函數(shù)的參數(shù)需要將整個對象傳過去,通過對象來獲取類內(nèi)成員
// 也就是this指針
int n = pthread_create(&_tid, nullptr, start_routine, this);
assert(n == 0);
(void)n;
}
// 線程等待
void join()
{
int n = pthread_join(_tid, nullptr);
assert(n == 0);
(void)n;
}
~Thread()
{
}
void *callback()
{
return _func(_args);
}
private:
std::string _name; // 類名
func_t _func; // 線程回調(diào)函數(shù)
void *_args; // 線程回調(diào)函數(shù)的參數(shù)
pthread_t _tid; // 線程id
static int threadnum; // 線程的編號,為生成線程名
};
// static的成員需在類外初始化
int Thread::threadnum = 1;
封裝鎖
同樣的為了不再需要一直調(diào)用系統(tǒng)接口,可以將整個方法封裝成類,通過類的對象實現(xiàn)加鎖過程
#pragma once
#include <iostream>
#include <pthread.h>
// 加鎖 解鎖
class Mutex
{
public:
Mutex(pthread_mutex_t *lock_p = nullptr) : _lock_p(lock_p)
{
}
// 加鎖
void lock()
{
if (_lock_p)
pthread_mutex_lock(_lock_p);
}
// 解鎖
void unlock()
{
if (_lock_p)
pthread_mutex_unlock(_lock_p);
}
~Mutex()
{
}
private:
pthread_mutex_t *_lock_p;
};
// 鎖的類
class LockGuard
{
public:
LockGuard(pthread_mutex_t *mutex) : _mutex(mutex)
{
_mutex.lock(); // 在構(gòu)造函數(shù)中進行加鎖
}
~LockGuard()
{
_mutex.unlock(); // 在析構(gòu)函數(shù)中進行解鎖
}
private:
Mutex _mutex;
};
封裝線程池
在類里面的線程回調(diào)方法必須設(shè)為static類型,而靜態(tài)的方法是不能訪問類內(nèi)成員的,因此傳給回調(diào)函數(shù)的參數(shù)需要將整個對象傳過去,通過對象來獲取類內(nèi)成員。
線程池需要實現(xiàn)為單例模式:
- 第一步就是把構(gòu)造函數(shù)私有,再把拷貝構(gòu)造和賦值運算符重載delete
- 在設(shè)置獲取單例對象的函數(shù)的時候,注意要設(shè)置成靜態(tài)成員函數(shù),因為在獲取對象前根本沒有對象,無法調(diào)用非靜態(tài)成員函數(shù)
- 可能會出現(xiàn)多個線程同時申請資源的場景,所以還需要一把鎖來保護這塊資源,而這把鎖也得設(shè)置成靜態(tài),因為單例模式的函數(shù)是靜態(tài)的
#pragma once
#include "Thread.hpp"
#include "log.hpp"
#include "Lock.hpp"
#include <vector>
#include <queue>
#include <mutex>
#include <pthread.h>
#include <unistd.h>
using namespace std;
// 線程池類定義位于下面,因此屬性類想要獲取到
// 就必須在前面聲明
template <class T>
class ThreadPool;
template <class T>
class ThreadData
{
public:
ThreadPool<T> *threadpool; // 線程所在的線程池,獲取到線程的this指針
std::string _name; // 線程的名字
public:
ThreadData(ThreadPool<T> *tp, const std::string &name) : threadpool(tp), _name(name)
{
}
};
template <class T>
class ThreadPool
{
private:
// 線程最終實現(xiàn)的方法
static void *handlerTask(void *args)
{
ThreadData<T> *td = (ThreadData<T> *)args;
while (true)
{
T t;
{
LockGuard lockguard(td->threadpool->mutex());
while (td->threadpool->isQueueEmpty())
{
td->threadpool->threadWait();
}
t = td->threadpool->pop();
}
t();
}
delete td;
return nullptr;
}
ThreadPool(const int &num = 10) : _num(num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
for (int i = 0; i < _num; i++)
{
_threads.push_back(new Thread());
}
}
void operator=(const ThreadPool &) = delete;
ThreadPool(const ThreadPool &) = delete;
public:
// 將加鎖 解鎖 判斷任務(wù)隊列是否為空 和條件變量等待全部封裝成類內(nèi)方法
// 方便在線程的回調(diào)方法中通過對象直接調(diào)用
void lockQueue() { pthread_mutex_lock(&_mutex); }
void unlockQueue() { pthread_mutex_unlock(&_mutex); }
bool isQueueEmpty() { return _task_queue.empty(); }
void threadWait() { pthread_cond_wait(&_cond, &_mutex); }
// 任務(wù)隊列刪除隊頭,并返回隊頭的任務(wù)
T pop()
{
T t = _task_queue.front();
_task_queue.pop();
return t;
}
pthread_mutex_t *mutex()
{
return &_mutex;
}
public:
// 讓每個線程對象調(diào)用其啟動函數(shù),并將線程輔助類和最終執(zhí)行的任務(wù)方法傳入函數(shù)中
// 線程的輔助類對象里包含了線程當(dāng)前線程池對象,也就是可以
// 通過輔助類對象可以調(diào)用到線程池對象里的成員
void run()
{
for (const auto &t : _threads)
{
ThreadData<T> *td = new ThreadData<T>(this, t->threadname());
t->start(handlerTask, td);
// 創(chuàng)建成功后打印日志
LogMessage(DEBUG, "%s start ...", t->threadname().c_str());
}
}
// 往任務(wù)隊列里插入一個任務(wù)
void push(const T &in)
{
LockGuard lockguard(&_mutex);
_task_queue.push(in);
pthread_cond_signal(&_cond);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
for (const auto &t : _threads)
delete t;
}
// 實現(xiàn)單例模式
static ThreadPool<T> *getInstance()
{
if (nullptr == tp)
{
_singlock.lock();
if (nullptr == tp)
{
tp = new ThreadPool<T>();
}
_singlock.unlock();
}
return tp;
}
private:
int _num;//線程的數(shù)量
std::vector<Thread *> _threads;//線程組
std::queue<T> _task_queue;//任務(wù)隊列
pthread_mutex_t _mutex;//鎖
pthread_cond_t _cond;//條件變量
static ThreadPool<T> *tp;
static std::mutex _singlock;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::tp = nullptr;
template <class T>
std::mutex ThreadPool<T>::_singlock;
TCP通信的接口和注意事項
為了實現(xiàn)TCP版的通信,首先來了解一下相關(guān)接口和注意事項
- TCP需要在通信前先創(chuàng)建鏈接,因此在TCP沒有鏈接之前其創(chuàng)建的套接字并不是用來通信的,而是用來監(jiān)聽的。一旦創(chuàng)建鏈接成功后,才會返回一個用來通信的套接字
- TCP時面向字節(jié)流的,因此其通信就是往文件上IO,因此不用指定的調(diào)用某接口去完成,直接用文件接口讀寫就可以完成
accept
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
這就是用來創(chuàng)建鏈接的接口
參數(shù)一為負(fù)責(zé)監(jiān)聽的套接字
參數(shù)二就是socket的結(jié)構(gòu)體
參數(shù)三為結(jié)構(gòu)體的大小
返回值,成功創(chuàng)建鏈接之后會返回一個值,這個值就是負(fù)責(zé)通信的套接字,也就是后面利用文件通信的文件描述符
TCP
封裝任務(wù)
因為上述說到TCP是可以直接使用文件操作來完成通信的,那么也就是說其通信根本就用不到其他的成員了,只需要知道一個套接字即可。那么這個方法就可以不放在類內(nèi),因為這就是線程最后的執(zhí)行目的,因此可以將這個任務(wù)單獨放到一個頭文件中。因為線程池是一個模板類,則可以封裝一個任務(wù)類。
#pragma once
#include <iostream>
#include <string>
#include <cstdio>
#include <functional>
#include "log.hpp"
// TCP的通信
// 線程的最終執(zhí)行方法
void ServerIO(int sock)
{
char buffer[1024];
while (true)
{
ssize_t n = read(sock, buffer, sizeof(buffer) - 1);
if (n > 0)
{
// read
buffer[n] = 0;
std::cout << "recv message: " << buffer << std::endl;
// write
std::string outbuffer = buffer;
outbuffer += " server[echo]";
write(sock, outbuffer.c_str(), outbuffer.size());
}
else if (n == 0)
{
// 代表client退出
LogMessage(NORMAL, "client quit, me too!");
break;
}
}
close(sock);
}
// 任務(wù)類
// 為了最終執(zhí)行的方法而服務(wù)
class Task
{
using func_t = std::function<void(int)>;
public:
Task()
{
}
Task(int sock, func_t func)
: _sock(sock), _callback(func)
{
}
void operator()()
{
_callback(_sock);
}
private:
int _sock; // 通信套接字
func_t _callback;
};
客戶端
客戶端不需要顯示的綁定端口號,而是由操作系統(tǒng)隨機去綁定。TCP的客戶端也不需要監(jiān)聽,因為并沒有去主動鏈接客戶端,所以不需要accept。TCP的客戶端只需要向服務(wù)端發(fā)起鏈接請求
Client.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include "log.hpp"
using namespace std;
class Client
{
public:
Client(const string &serverip, const uint16_t &port)
: _serverip(serverip), _port(port), _sock(-1)
{
}
void Init()
{
// 創(chuàng)建套接字
_sock = socket(AF_INET, SOCK_STREAM, 0);
if (_sock < 0)
{
LogMessage(FATAL, "create socket error");
exit(1);
}
// TCP的客戶端也不需要顯示綁定端口,讓操作系統(tǒng)隨機綁定
// TCP的客戶端也不需要監(jiān)聽,因為并沒有去主動鏈接客戶端,所以不需要accept
// TCP的客戶端只需要向服務(wù)端發(fā)起鏈接請求
}
void start()
{
// 向服務(wù)端發(fā)起鏈接請求
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = inet_addr(_serverip.c_str());
if (connect(_sock, (struct sockaddr *)&local, sizeof(local)) < 0)
LogMessage(ERROR, "connect socket error");
// 和服務(wù)端通信
else
{
string line;
while (1)
{
cout << "Please cin: " << endl;
getline(cin, line);
// 向服務(wù)端寫
write(_sock, line.c_str(), line.size());
// 讀服務(wù)端返回來的數(shù)據(jù)
char buff[1024];
int n = read(_sock, buff, sizeof(buff) - 1);
if (n > 0)
{
buff[n] = 0;
cout << "接收到的消息為:" << buff << endl;
}
else
break;
}
}
}
~Client()
{
if(_sock >= 0)
close(_sock);
}
private:
int _sock;
string _serverip;
uint16_t _port;
};
Client.cc
#include "Client.hpp"
#include <memory>
// 輸出命令錯誤函數(shù)
void Usage(string proc)
{
cout << "Usage:\n\t" << proc << " local_ip local_port\n\n";
}
int main(int argc, char* argv[])
{
// 再運行客戶端時,輸入的指令需要包括主機ip和端口號
if(argc != 3)
{
Usage(argv[0]);
exit(1);
}
string serverip = argv[1];
uint16_t port = atoi(argv[2]);
unique_ptr<Client> client(new Client(serverip, port));
client->Init();
client->start();
return 0;
}
服務(wù)端
那么對于服務(wù)端而言,必須要顯式的去綁定端口號。則創(chuàng)建的套接字并不是負(fù)責(zé)通信的。創(chuàng)建好套接字和綁定完網(wǎng)絡(luò)信息后,需要設(shè)置創(chuàng)建的套接字為監(jiān)聽狀態(tài)。和UDP一樣,服務(wù)端是不能指定IP的.
還需要注意的是:因為封裝的線程池是單例模式,所以不需要創(chuàng)建對象,直接調(diào)用靜態(tài)對象去調(diào)用類方法即可
步驟可分為:
- 創(chuàng)建監(jiān)聽套接字
- 綁定網(wǎng)絡(luò)信息
- 設(shè)置套接字為監(jiān)聽狀態(tài)
- 獲取鏈接,得到通信的套接字
- 通信
- 關(guān)閉不需要的套接字
Server.hpp
#pragma once
#include "Task.hpp"
#include "ThreadPool.hpp"
#include <sys/types.h>
#include <sys/socket.h>
#include <cstring>
#include <netinet/in.h>
#include <arpa/inet.h>
class Server
{
public:
Server(const uint16_t &port = 8000)
: _port(port)
{
}
void Init()
{
// 創(chuàng)建負(fù)責(zé)監(jiān)聽的套接字 面向字節(jié)流
_listenSock = socket(AF_INET, SOCK_STREAM, 0);
if (_listenSock < 0)
{
LogMessage(FATAL, "create socket error!");
exit(1);
}
LogMessage(NORMAL, "create socket %d success!", _listenSock);
// 綁定網(wǎng)絡(luò)信息
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(_listenSock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
LogMessage(FATAL, "bind socket error!");
exit(3);
}
LogMessage(NORMAL, "bind socket success!");
// 設(shè)置socket為監(jiān)聽狀態(tài)
if (listen(_listenSock, 5) < 0)
{
LogMessage(FATAL, "listen socket error!");
exit(4);
}
LogMessage(NORMAL, "listen socket success!");
}
void start()
{
while (1)
{
// 因為線程池時單例模式,所以直接調(diào)用初始化
ThreadPool<Task>::getInstance()->run();
LogMessage(NORMAL, "Thread init success");
// server獲取建立新連接
struct sockaddr_in peer;
memset(&peer, 0, sizeof(peer));
socklen_t len = sizeof(peer);
// 創(chuàng)建通信的套接字
// accept的返回值才是真正用于通信的套接字
_sock = accept(_listenSock, (struct sockaddr *)&peer, &len);
if (_sock < 0)
{
// 獲取通信的套接字失敗并不影響未來的操作,只是當(dāng)前的鏈接失敗而已
LogMessage(ERROR, "accept socket error, next");
continue;
}
LogMessage(NORMAL, "accept socket %d success", _sock);
cout << "sock: " << _sock << endl;
// 往線程池的任務(wù)隊列里插入任務(wù)
ThreadPool<Task>::getInstance()->push(Task(_sock, ServerIO));
}
}
private:
int _listenSock; // 負(fù)責(zé)監(jiān)聽的套接字
int _sock; // 通信的套接字
uint16_t _port; // 端口號
};
Server.cc
#include "Server.hpp"
#include "daemon.hpp"
#include <memory>
// 輸出命令錯誤函數(shù)
void Usage(string proc)
{
cout << "Usage:\n\t" << proc << " local_ip local_port\n\n";
}
int main(int argc, char* argv[])
{
// 啟動服務(wù)端不需要指定IP
if(argc != 2)
{
Usage(argv[0]);
exit(1);
}
uint16_t port = atoi(argv[1]);
unique_ptr<Server> server(new Server(port));
server->Init();
server->start();
return 0;
}
實現(xiàn)效果
可以看到多個客戶端同時訪問也沒有問題,并且所對應(yīng)的套接字也就是文件描述符也不一樣。
守護進程
守護進程是一種特殊的孤兒進程,其運行于后臺,生存期較長并且獨立與終端周期性的執(zhí)行任務(wù)或者等待處理任務(wù)
進程分為前臺運行和后臺運行,每一個進程都會屬于一個會話組里。每一個會話組都有且只有能一個前臺進程。像上述的服務(wù)端,當(dāng)運行服務(wù)端時,操作系統(tǒng)會將其分到含有bash的會話組內(nèi),并且將服務(wù)端置為前臺任務(wù)進程,因此服務(wù)端運行時bash把放置后臺這也就是為什么用戶不能再bash繼續(xù)輸入命令的原因。
每一個會話組都會有一個組長,一般而言在bash中輸入命令執(zhí)行的進程都會分到bash的會話組內(nèi),這個會話組的組長即為bash。可以通過查看進程的SID確認(rèn)進程的會話組
可以看到上述圖片中運行了三個進程并置于后臺,他們的SID也就是會話組都是一樣的。那么如果將他們置于前臺運行會發(fā)生什么呢
可以看到,置于前臺運行后,命令行輸入什么都沒有反應(yīng)了。也就是說,此時的bash被自動的放到了后臺運行,證實了一個會話組只能有一個前臺進程
輸入ctr + Z 之后前臺的進程就會把切回后臺,但是切回后臺后進程是阻塞狀態(tài)的,因此輸入bg + 作業(yè)號就可讓進程啟動。
服務(wù)端守護進程化
那么很顯然,在業(yè)務(wù)邏輯上服務(wù)端肯定是需要守護進程化的。因為服務(wù)端沒有特殊情況是不會關(guān)閉的,需要一直運行。如果服務(wù)端是前臺進程的話,那服務(wù)端運行時bash都不能用了,顯然不符合。
這里要介紹一個接口:
#include <unistd.h>
pid_t setsid(void);
這個接口的作用是使調(diào)用的進程獨立成為一個會話組并且為該組的組長。但是調(diào)用這個接口是有前置條件的:調(diào)用這個接口的進程不能為某個會話組的組長
守護進程化的步驟:
- 讓調(diào)用進程忽略掉異常信號,因為其不受終端控制的
- 讓調(diào)用進程不為組長
- 關(guān)閉或者重定向之前默認(rèn)打開的文件,如0 1 2文件描述符
#pragma once
#include <unistd.h>
#include <signal.h>
#include <cstdlib>
#include <cassert>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define DEV "/dev/null"
void daemonSelf(const char *currPath = nullptr)
{
// 1. 讓調(diào)用進程忽略掉異常的信號
signal(SIGPIPE, SIG_IGN);
// 2. 讓自己不是組長,setsid
if (fork() > 0)
exit(0);
// 子進程 -- 守護進程,精靈進程,本質(zhì)就是孤兒進程的一種!
pid_t n = setsid();
assert(n != -1);
// 3. 守護進程是脫離終端的,關(guān)閉或者重定向以前進程默認(rèn)打開的文件
int fd = open(DEV, O_RDWR);
if(fd >= 0)
{
dup2(fd, 0);
dup2(fd, 1);
dup2(fd, 2);
close(fd);
}
else
{
close(0);
close(1);
close(2);
}
}
接著只需要服務(wù)端在初始化完成后調(diào)用這個函數(shù),將自己設(shè)為守護進程化即可
一起來看看效果:
文章來源:http://www.zghlxwxcb.cn/news/detail-628472.html
可以看到服務(wù)端啟動后并不會影響bash,仍然可以在bash上輸入指令去執(zhí)行??蛻舳艘材軌蚝芎玫慕邮盏綌?shù)據(jù),這就符合現(xiàn)實中服務(wù)端的邏輯。文章來源地址http://www.zghlxwxcb.cn/news/detail-628472.html
到了這里,關(guān)于利用線程池多線程并發(fā)實現(xiàn)TCP兩端通信交互,并將服務(wù)端設(shè)為守護進程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!