Linux知識點 – Linux多線程(四)
一、線程池
1.概念
一種線程使用模式。線程過多會帶來調(diào)度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務。這避免了在處理短時間任務時創(chuàng)建與銷毀線程的代價。線程池不僅能夠保證內(nèi)核的充分利用,還能防止過分調(diào)度。可用線程數(shù)量應該取決于可用的并發(fā)處理器、處理器內(nèi)核、內(nèi)存、網(wǎng)絡sockets等的數(shù)量。
- 預先申請資源,用空間換時間;
- 預先申請一批線程,任務到來就處理;
- 線程池就是一個生產(chǎn)消費模型;
2.實現(xiàn)
thread.hpp
線程封裝:
#pragma once
#include<iostream>
#include<string>
#include<functional>
#include<cstdio>
typedef void* (*fun_t)(void*); // 定義函數(shù)指針類型,后面回調(diào)
class ThreadData // 線程信息結(jié)構(gòu)體
{
public:
void* _args;
std::string _name;
};
class Thread
{
public:
Thread(int num, fun_t callback, void* args)
: _func(callback)
{
char nameBuffer[64];
snprintf(nameBuffer, sizeof(nameBuffer), "Thread-%d", num);
_name = nameBuffer;
_tdata._args = args;
_tdata._name = _name;
}
void start() // 創(chuàng)建線程
{
pthread_create(&_tid, nullptr, _func, (void*)&_tdata); // 直接將_tdata作為參數(shù)傳給回調(diào)函數(shù)
}
void join() // 線程等待
{
pthread_join(_tid, nullptr);
}
std::string name()
{
return _name;
}
~Thread()
{}
private:
std::string _name;
fun_t _func;
ThreadData _tdata;
pthread_t _tid;
};
lockGuard.hpp
鎖的封裝,構(gòu)建對象時直接加鎖,對象析構(gòu)時自動解鎖;
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx)
: _pmtx(mtx)
{
}
void lock()
{
pthread_mutex_lock(_pmtx);
}
void unlock()
{
pthread_mutex_unlock(_pmtx);
}
~Mutex()
{}
private:
pthread_mutex_t *_pmtx;
};
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx)
: _mtx(mtx)
{
_mtx.lock();
}
~lockGuard()
{
_mtx.unlock();
}
private:
Mutex _mtx;
};
log.hpp
#pragma once
#include<iostream>
#include<cstdio>
#include<cstdarg>
#include<ctime>
#include<string>
//日志級別
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char* gLevelMap[] = {
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
#define LOGFILE "./threadpool.log"
//完整的日志功能,至少需要:日志等級 時間 支持用戶自定義(日志內(nèi)容,文件行,文件名)
void logMessage(int level, const char* format, ...)
{
#ifndef DEBUG_SHOW
if(level == DEBUG) return;
#endif
char stdBuffer[1024];//標準部分
time_t timestamp = time(nullptr);
snprintf(stdBuffer, sizeof(stdBuffer), "[%s] [%ld] ", gLevelMap[level], timestamp);
char logBuffer[1024];//自定義部分
va_list args;
va_start(args, format);
vsnprintf(logBuffer, sizeof(logBuffer), format, args);
va_end(args);
FILE* fp = fopen(LOGFILE, "a");
fprintf(fp, "%s %s\n", stdBuffer, logBuffer);
fclose(fp);
}
- 注:
(1)提取可變參數(shù)
使用宏來提取可變參數(shù):
將可變參數(shù)格式化打印到對應地點:
format是打印的格式;
(2)條件編譯:
條件編譯,不想調(diào)試的時候,就不加DEBUG宏,不打印日志信息;
-D:在命令行定義宏 ;
threadPool.hpp
線程池封裝:
#include "thread.hpp"
#include <vector>
#include <queue>
#include <unistd.h>
#include "log.hpp"
#include "Task.hpp"
#include "lockGuard.hpp"
const int g_thread_num = 3;
template <class T>
class ThreadPool
{
public:
pthread_mutex_t *getMutex()
{
return &_lock;
}
bool isEmpty()
{
return _task_queue.empty();
}
void waitCond()
{
pthread_cond_wait(&_cond, &_lock);
}
T getTask()
{
T t = _task_queue.front();
_task_queue.pop();
return t;
}
ThreadPool(int thread_num = g_thread_num)
: _num(thread_num)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
for (int i = 1; i <= _num; i++)
{
_threads.push_back(new Thread(i, routine, this));
// 線程構(gòu)造傳入的this指針,是作為ThreadData結(jié)構(gòu)體的參數(shù)的,ThreadData結(jié)構(gòu)體才是routine回調(diào)函數(shù)的參數(shù)
// 由于由于回調(diào)函數(shù)是靜態(tài)成員,無法訪問非靜態(tài)成員
// 這里傳入this指針是用來在回調(diào)函數(shù)中給訪問非靜態(tài)成員的
}
}
void run()
{
for (auto &iter : _threads)
{
iter->start();
logMessage(NORMAL, "%s %s", iter->name().c_str(), "啟動成功");
}
}
// 消費過程:線程調(diào)用回調(diào)函數(shù)取任務就是所謂的消費過程,訪問了臨界資源,需要加鎖
static void *routine(void *args) //由于這個函數(shù)是類內(nèi)成員,參數(shù)是有this指針的,參數(shù)類型不對,因此多線程回調(diào)的時候無法識別
//需要設(shè)置成static靜態(tài)成員,才可以回調(diào)
{
ThreadData *td = (ThreadData *)args;
ThreadPool<T> *tp = (ThreadPool<T> *)td->_args; // 拿到this指針,通過本對象的this指針來調(diào)用成員函數(shù)
while (true)
{
T task;
{
lockGuard lockguard(tp->getMutex());
while (tp->isEmpty())
{
tp->waitCond();
}
// 讀取任務
task = tp->getTask();
// 任務隊列是共享的,將任務從共享空間,拿到私有空間
}
task(td->_name); // 處理任務
}
}
void pushTask(const T &task)
{
lockGuard lockguard(&_lock); // 訪問臨界資源,需要加鎖
_task_queue.push(task);
pthread_cond_signal(&_cond); // 推送任務后,發(fā)送信號,讓進程處理
}
~ThreadPool()
{
for (auto &iter : _threads)
{
iter->join();
delete iter;
}
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
private:
std::vector<Thread *> _threads; // 線程池
int _num;
std::queue<T> _task_queue; // 任務隊列
pthread_mutex_t _lock; // 鎖
pthread_cond_t _cond; // 條件變量
};
- 注:
(1)如果回調(diào)函數(shù)routine放在thread類里面,由于成員函數(shù)會默認傳this指針,因此參數(shù)識別的時候可能會出錯,所以需要設(shè)置成靜態(tài)成員;
(2)如果設(shè)置成靜態(tài)類內(nèi)方法,這個函數(shù)只能使用靜態(tài)成員,而不能使用其他類內(nèi)成員;
可以讓routine函數(shù)拿到整體對象,在構(gòu)造線程的時候,routine的參數(shù)傳入this指針;
在構(gòu)造函數(shù)的初始化列表中是參數(shù)的初始化,在下面的函數(shù)體中是賦值的過程,因此在函數(shù)體中對象已經(jīng)存在了,就可以使用this指針了;
(3)類內(nèi)公有接口讓靜態(tài)成員函數(shù)routine通過this指針能夠訪問類內(nèi)成員;
testMain.cc
#include"threadPool.hpp"
#include"Task.hpp"
#include<ctime>
#include<cstdlib>
#include<iostream>
#include<unistd.h>
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
ThreadPool<Task>* tp = new ThreadPool<Task>();
tp->run();
while(true)
{
//生產(chǎn)的時候,只做任務要花時間
int x = rand()%100 + 1;
usleep(7756);
int y = rand()%30 + 1;
Task t(x, y, [](int x, int y)->int{
return x + y;
});
logMessage(DEBUG, "制作任務完成:%d+%d=?", x, y);
//推送任務到線程池中
tp->pushTask(t);
sleep(1);
}
return 0;
}
運行結(jié)果:
3.單例模式的線程池
threadPool.hpp
#include "thread.hpp"
#include <vector>
#include <queue>
#include <unistd.h>
#include "log.hpp"
#include "Task.hpp"
#include "lockGuard.hpp"
const int g_thread_num = 3;
template <class T>
class ThreadPool
{
public:
pthread_mutex_t *getMutex()
{
return &_lock;
}
bool isEmpty()
{
return _task_queue.empty();
}
void waitCond()
{
pthread_cond_wait(&_cond, &_lock);
}
T getTask()
{
T t = _task_queue.front();
_task_queue.pop();
return t;
}
//單例模式線程池:懶漢模式
private:
//構(gòu)造函數(shù)設(shè)為私有
ThreadPool(int thread_num = g_thread_num)
: _num(thread_num)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
for (int i = 1; i <= _num; i++)
{
_threads.push_back(new Thread(i, routine, this));
// 線程構(gòu)造傳入的this指針,是作為ThreadData結(jié)構(gòu)體的參數(shù)的,ThreadData結(jié)構(gòu)體才是routine回調(diào)函數(shù)的參數(shù)
}
}
ThreadPool(const ThreadPool<T> &other) = delete;
const ThreadPool<T>& operator=(const ThreadPool<T> &other) = delete;
public:
//創(chuàng)建單例對象的類內(nèi)靜態(tài)成員函數(shù)
static ThreadPool<T>* getThreadPool(int num = g_thread_num)
{
//在這里再加上一個條件判斷,可以有效減少未來必定要進行的加鎖檢測的問題
//攔截大量的在已經(jīng)創(chuàng)建好單例的時候,剩余線程請求單例而直接申請鎖的行為
if(nullptr == _thread_ptr)
{
//加鎖
lockGuard lockguard(&_mutex);
//未來任何一個線程想要獲取單例,都必須調(diào)用getThreadPool接口
//一定會存在大量的申請鎖和釋放鎖的行為,無用且浪費資源
if(nullptr == _thread_ptr)
{
_thread_ptr = new ThreadPool<T>(num);
}
}
return _thread_ptr;
}
void run()
{
for (auto &iter : _threads)
{
iter->start();
logMessage(NORMAL, "%s %s", iter->name().c_str(), "啟動成功");
}
}
// 消費過程:線程調(diào)用回調(diào)函數(shù)取任務就是所謂的消費過程,訪問了臨界資源,需要加鎖
static void *routine(void *args)
{
ThreadData *td = (ThreadData *)args;
ThreadPool<T> *tp = (ThreadPool<T> *)td->_args; // 拿到this指針
while (true)
{
T task;
{
lockGuard lockguard(tp->getMutex());
while (tp->isEmpty())
{
tp->waitCond();
}
// 讀取任務
task = tp->getTask();
// 任務隊列是共享的,將任務從共享空間,拿到私有空間
}
task(td->_name); // 處理任務
}
}
void pushTask(const T &task)
{
lockGuard lockguard(&_lock); // 訪問臨界資源,需要加鎖
_task_queue.push(task);
pthread_cond_signal(&_cond); // 推送任務后,發(fā)送信號,讓進程處理
}
~ThreadPool()
{
for (auto &iter : _threads)
{
iter->join();
delete iter;
}
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
private:
std::vector<Thread *> _threads; // 線程池
int _num;
std::queue<T> _task_queue; // 任務隊列
static ThreadPool<T>* _thread_ptr;
static pthread_mutex_t _mutex;
pthread_mutex_t _lock; // 鎖
pthread_cond_t _cond; // 條件變量
};
//靜態(tài)成員在類外初始化
template<class T>
ThreadPool<T>* ThreadPool<T>::_thread_ptr = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::_mutex = PTHREAD_MUTEX_INITIALIZER;
多線程同時調(diào)用單例過程,由于創(chuàng)建過程是非原子的,有可能被創(chuàng)建多個對象,是非線程安全的;
需要對創(chuàng)建對象的過程加鎖,就可以保證在多線程場景當中獲取單例對象;
但是未來任何一個線程想調(diào)用單例對象,都必須調(diào)用這個成員函數(shù),就會存在大量申請和釋放鎖的行為;
可以在之間加一個對單例對象指針的判斷,若不為空,就不進行對象創(chuàng)建;
testMain.cc
#include"threadPool.hpp"
#include"Task.hpp"
#include<ctime>
#include<cstdlib>
#include<iostream>
#include<unistd.h>
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
//ThreadPool<Task>* tp = new ThreadPool<Task>();
//tp->run();
ThreadPool<Task>::getThreadPool()->run();//創(chuàng)建單例對象
while(true)
{
//生產(chǎn)的時候,只做任務要花時間
int x = rand()%100 + 1;
usleep(7756);
int y = rand()%30 + 1;
Task t(x, y, [](int x, int y)->int{
return x + y;
});
logMessage(DEBUG, "制作任務完成:%d+%d=?", x, y);
//推送任務到線程池中
ThreadPool<Task>::getThreadPool()->pushTask(t);
sleep(1);
}
return 0;
}
運行結(jié)果:
二、STL、智能指針和線程安全
1.STL的容器是否是線程安全的
不是;
原因是, STL的設(shè)計初衷是將性能挖掘到極致,而一旦涉及到加鎖保證線程安全,會對性能造成巨大的影響;
而且對于不同的容器,加鎖方式的不同,性能可能也不同(例如hash表的鎖表和鎖桶)。
因此STL默認不是線程安全。如果需要在多線程環(huán)境下使用,往往需要調(diào)用者自行保證線程安全。
2.智能指針是否是線程安全的
對于unique_ ptr,由于只是在當前代碼塊范圍內(nèi)生效,因此不涉及線程安全問題;
對于shared_ptr,多個對象需要共用一個引用計數(shù)變量,所以會存在線程安全問題.但是標準庫實現(xiàn)的時候考慮到了這個問題,基于原子操作(CAS)的方式保證shared_ptr 能夠高效,原子的操作弓|用計數(shù);
三、其他常見的各種鎖
- 悲觀鎖:在每次取數(shù)據(jù)時,總是擔心數(shù)據(jù)會被其他線程修改,所以會在取數(shù)據(jù)前先加鎖(讀鎖,寫鎖,行鎖等) ,當其他線程想要訪問數(shù)據(jù)時,被阻塞掛起;
-
樂觀鎖:每次取數(shù)據(jù)時候,總是樂觀的認為數(shù)據(jù)不會被其他線程修改,因此不上鎖。但是在更新數(shù)據(jù)前,會判斷其他數(shù)據(jù)在更新前有沒有對數(shù)據(jù)進行修改。主要采用兩種方式:版本號機制和CAS操作;
CAS操作:當需要更新數(shù)據(jù)時,判斷當前內(nèi)存值和之前取得的值是否相等。如果相等則用新值更新。若不等則失敗,失敗則重試,一般是一個自旋的過程,即不斷重試; -
自旋鎖
臨界資源就緒的時間決定了線程等待的策略;
不斷檢測資源是否就緒就是自旋(輪詢檢測);
自旋鎖本質(zhì)就是通過不斷檢測鎖狀態(tài),來檢測資源是否就緒的方案;
互斥鎖是檢測到資源未就緒,就掛起線程;
臨界資源就緒的時間決定了使用哪種鎖;
四、讀者寫者問題
1.讀寫鎖
在編寫多線程的時候,有一種情況是十分常見的。那就是,有些公共數(shù)據(jù)修改的機會比較少,相比較改寫,它們讀的機會反而高的多。通常而言,在讀的過程中,往往伴隨著查找的操作,中間耗時長。給這種代碼段加鎖,會極大地降低我們程序的效率。那么有沒有一種方法,可以專門]處理這種多讀少寫的情況呢?有,那就是讀寫鎖。
-
讀者寫者模型與生產(chǎn)消費模型的本質(zhì)區(qū)別:
生產(chǎn)消費模型中消費者會取走數(shù)據(jù),而讀者寫者模型中讀者不會取走數(shù)據(jù); -
讀鎖的優(yōu)先級高;
2.讀寫鎖接口
-
初始化:
-
讀者加鎖:
-
寫者加鎖:文章來源:http://www.zghlxwxcb.cn/news/detail-693788.html
生產(chǎn)消費模型中,生產(chǎn)者和消費者的地位是對等的,這樣才能達到最高效的狀態(tài)
而讀寫者模型中,寫者只有在讀者全部退出的時候才能寫,是讀者優(yōu)先的,這樣就會發(fā)生寫者饑餓問題;
讀者寫者問題中讀鎖的優(yōu)先級高,是因為這種模型的應用場景為:數(shù)據(jù)的讀取頻率非常高,而被修改的頻率特別低,這樣有助于提升效率;文章來源地址http://www.zghlxwxcb.cn/news/detail-693788.html
到了這里,關(guān)于Linux知識點 -- Linux多線程(四)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!