本項(xiàng)目是基于C++11的線程池。使用了許多C++的新特性,包含不限于模板函數(shù)泛型編程、std::future、std::packaged_task、std::bind、std::forward完美轉(zhuǎn)發(fā)、std::make_shared智能指針、decltype類型推斷、std::unique_lock鎖等C++11新特性功能。
本項(xiàng)目有一定的上手難度。推薦參考系列文章
C++11實(shí)用技術(shù)(一)auto與decltype的使用
C++11實(shí)用技術(shù)(二)std::function和bind綁定器
C++11實(shí)用技術(shù)(三)std::future、std::promise、std::packaged_task、async
C++ operator關(guān)鍵字的使用(重載運(yùn)算符、仿函數(shù)、類型轉(zhuǎn)換操作符)
右值引用以及move移動(dòng)語義和forward 完美轉(zhuǎn)發(fā)
C++標(biāo)準(zhǔn)庫中的鎖lock_guard、unique_lock、shared_lock、scoped_lock、recursive_mutex
代碼結(jié)構(gòu)
本項(xiàng)目線程池功能分以下幾個(gè)函數(shù)去實(shí)現(xiàn):
threadpool.init(isize_t num);設(shè)置線程的數(shù)量
threadpool::get(TaskFuncPtr& task);讀取任務(wù)隊(duì)列中的任務(wù)
threadpool::run();通過get()讀取任務(wù)并執(zhí)行
threadpool.start(); 啟動(dòng)線程池,并通過run()執(zhí)行任務(wù)
threadpool.exec();封裝任務(wù)到任務(wù)隊(duì)列中
threadpool.waitForAllDone();等待所有任務(wù)執(zhí)行完成
threadpool.stop();分離線程,釋放內(nèi)存
threadpool.init
init的功能是初始化線程池,主要是設(shè)置線程的數(shù)量到類的成員變量中。
bool ZERO_ThreadPool::init(size_t num)
{
std::unique_lock<std::mutex> lock(mutex_);
if (!threads_.empty())
{
return false;
}
threadNum_ = num;
return true;
}
threadNum_:保存線程的數(shù)量,在init函數(shù)中被賦值
此處使用unique_lock或lock_guard的加鎖方式都能實(shí)現(xiàn)自動(dòng)加鎖和解鎖。但是unique_lock可以進(jìn)行臨時(shí)解鎖和再上鎖,而lock_guard不行,特殊情況下還是必須使用unique_lock(用到條件變量的情況)。(lock_guard比較簡單,相對(duì)來說性能要好一點(diǎn))
threadpool::get
從任務(wù)隊(duì)列中獲取獲取任務(wù),這里其實(shí)就是我們的消費(fèi)者模塊。
bool ZERO_ThreadPool::get(TaskFuncPtr& task)
{
std::unique_lock<std::mutex> lock(mutex_);
if (tasks_.empty()) //判斷任務(wù)是否存在
{
//要終止線程池 bTerminate_設(shè)置為true,任務(wù)隊(duì)列不為空
condition_.wait(lock, [this] { return bTerminate_ || !tasks_.empty(); });
}
if (bTerminate_)
return false;
if (!tasks_.empty())
{
task = std::move(tasks_.front()); // 使用了移動(dòng)語義
tasks_.pop(); //釋放資源,釋放一個(gè)任務(wù)
return true;
}
return false;
}
條件變量condition_.wait(lock, [this] { return bTerminate_ || !tasks_.empty(); });是需要一直等待條件完成才退出。即任務(wù)終止,或者任務(wù)隊(duì)列不為空時(shí),就會(huì)退出條件變量的阻塞狀態(tài),繼續(xù)執(zhí)行下面的邏輯。
task = std::move(tasks_.front()); 使用了移動(dòng)語義,將 tasks_.front() 的內(nèi)容移動(dòng)到了 task 中??梢詼p少內(nèi)容拷貝。移動(dòng)完之后tasks_.front() 的內(nèi)容會(huì)變?yōu)槲粗付ǖ臓顟B(tài),所以直接pop掉就好了。
threadpool::run
這里是運(yùn)行我們的任務(wù)部分。包括調(diào)用get在任務(wù)隊(duì)列中獲取任務(wù),以及執(zhí)行任務(wù)。
void ZERO_ThreadPool::run() // 執(zhí)行任務(wù)的線程
{
//調(diào)用處理部分
while (!isTerminate()) // 判斷是不是要停止
{
TaskFuncPtr task;
bool ok = get(task); // 讀取任務(wù)
if (ok)
{
++atomic_;
try
{
if (task->_expireTime != 0 && task->_expireTime < TNOWMS)
{//如果設(shè)置了超時(shí),并且超時(shí)了,就需要執(zhí)行本邏輯
//超時(shí)任務(wù),本代碼未實(shí)現(xiàn),有需要可實(shí)現(xiàn)在此處
}
else
{
task->_func(); // 執(zhí)行任務(wù)
}
}
catch (...)
{
}
--atomic_;
}
}
}
}
atomic_:運(yùn)行一個(gè)任務(wù),該參數(shù)+1;執(zhí)行完畢,該參數(shù)-1。這里是為了待會(huì)停止線程池時(shí)判斷是否還有運(yùn)行中的任務(wù)(未完成的線程)。
threadpool.start
創(chuàng)建線程,并把線程池存入vector中,后面釋放線程池時(shí),好一一釋放線程。
bool ZERO_ThreadPool::start()
{
std::unique_lock<std::mutex> lock(mutex_);
if (!threads_.empty())
{
return false;
}
for (size_t i = 0; i < threadNum_; i++)
{
threads_.push_back(new thread(&ZERO_ThreadPool::run, this));
}
return true;
}
threads_.push_back(new thread(&ZERO_ThreadPool::run, this));創(chuàng)建線程,線程的回調(diào)函數(shù)為run。
threadpool.exec
exec是將我們的任務(wù)存入任務(wù)隊(duì)列中,這段代碼是本項(xiàng)目最難的,用了很多C++的新特性。
/*
template <class F, class... Args>
它是c++里新增的最強(qiáng)大的特性之一,它對(duì)參數(shù)進(jìn)行了高度泛化,它能表示0到任意個(gè)數(shù)、任意類型的參數(shù)
auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))>
std::future<decltype(f(args...))>:返回future,調(diào)用者可以通過future獲取返回值
返回值后置
*/
template <class F, class... Args>
auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>//接受一個(gè)超時(shí)時(shí)間 `timeoutMs`,一個(gè)可調(diào)用對(duì)象 `f` 和其它參數(shù) `args...`,并返回一個(gè) `std::future` 對(duì)象,該對(duì)象可以用于獲取任務(wù)執(zhí)行的結(jié)果。
{
int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs); // 根據(jù)超時(shí)時(shí)間計(jì)算任務(wù)的過期時(shí)間 `expireTime`,如果超時(shí)時(shí)間為 0,則任務(wù)不會(huì)過期。
//定義返回值類型
using RetType = decltype(f(args...)); // 使用 `decltype` 推導(dǎo)出 `f(args...)` 的返回值類型,并將其定義為 `RetType`(這里的using和typedef功能一樣,就是為一個(gè)類型起一個(gè)別名)。
// 封裝任務(wù) 使用 `std::packaged_task` 將可調(diào)用對(duì)象 `f` 和其它參數(shù) `args...` 封裝成一個(gè)可執(zhí)行的函數(shù),并將其存儲(chǔ)在一個(gè) `std::shared_ptr` 對(duì)象 `task` 中。
auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime); // 封裝任務(wù)指針,設(shè)置過期時(shí)間 創(chuàng)建一個(gè) `TaskFunc` 對(duì)象,并將任務(wù)的過期時(shí)間 `expireTime` 傳遞給它。
fPtr->_func = [task]() { // 具體執(zhí)行的函數(shù) 將封裝好的任務(wù)函數(shù)存儲(chǔ)在 `TaskFunc` 對(duì)象的 `_func` 成員中,該函數(shù)會(huì)在任務(wù)執(zhí)行時(shí)被調(diào)用。
(*task)();
};
std::unique_lock<std::mutex> lock(mutex_);
tasks_.push(fPtr); // 將任務(wù)插入任務(wù)隊(duì)列中
condition_.notify_one(); // 喚醒阻塞的線程,可以考慮只有任務(wù)隊(duì)列為空的情況再去notify
return task->get_future();; //返回一個(gè) `std::future` 對(duì)象,該對(duì)象可以用于獲取任務(wù)執(zhí)行的結(jié)果。
}
使用了可變參數(shù)模板函數(shù)。
tasks_:保存任務(wù)的隊(duì)列
condition_.notify_one():保存一個(gè)任務(wù)喚醒一個(gè)條件變量
std::future : 異步指向某個(gè)任務(wù),然后通過future特性去獲取任務(wù)函數(shù)的返回結(jié)果。
std::bind:將參數(shù)列表和函數(shù)綁定,生成一個(gè)新的可調(diào)用對(duì)象
std::packaged_task:將任務(wù)和feature綁定在一起的模板,是一種封裝對(duì)任務(wù)的封裝。
本函數(shù)用到了泛型編程模板函數(shù),輸入?yún)?shù)有3個(gè):一個(gè)超時(shí)時(shí)間 timeoutMs
,一個(gè)可調(diào)用對(duì)象 f
和參數(shù) args...
。采用返回值后置的方式返回一個(gè)std::future對(duì)象。這里采用返回值后置是為了方便使用decltype(f(args…)推導(dǎo)數(shù)據(jù)類型。
auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward(f), std::forward(args)…));是將我們傳進(jìn)來的任務(wù)函數(shù)和參數(shù)bind成一個(gè)對(duì)象,這個(gè)對(duì)象可以看作是一整個(gè)函數(shù),其返回值就是RetType 類型,并且沒有輸入?yún)?shù)。所以用std::packaged_task<RetType()>這樣的格式來打包封裝。封裝好的對(duì)象用智能指針(std::make_shared)來管理。
同時(shí)還要?jiǎng)?chuàng)建一個(gè)TaskFunc的對(duì)象,同樣用智能指針管理,這個(gè)對(duì)象包括兩項(xiàng)內(nèi)容,一個(gè)就是超時(shí)時(shí)間,一個(gè)就是我們封裝好的task對(duì)象。
通過TaskFuncPtr fPtr = std::make_shared(expireTime);和fPtr->_func = task {(*task)();};兩條代碼將這兩項(xiàng)傳進(jìn)去。
最后會(huì)通過task->get_future()返回我們?nèi)蝿?wù)函數(shù)執(zhí)行的結(jié)果返回值。
threadpool.waitForAllDone
等待所有任務(wù)執(zhí)行完成。
bool ZERO_ThreadPool::waitForAllDone(int millsecond)
{
std::unique_lock<std::mutex> lock(mutex_);
if (tasks_.empty() && atomic_ == 0)
return true;
if (millsecond < 0)
{
condition_.wait(lock, [this] { return tasks_.empty() && atomic_ == 0; });
return true;
}
else
{
return condition_.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return tasks_.empty() && atomic_ == 0; });
}
}
使用條件變量來等待任務(wù)執(zhí)行完成。支持超時(shí)執(zhí)行功能。
此處unique_lock的使用是必須的: 條件變量condition_在wait時(shí)會(huì)進(jìn)行unlock再進(jìn)入休眠, lock_guard并無該操作接口
threadpool.stop
終止線程池。會(huì)調(diào)用waitForAllDone等待所有任務(wù)執(zhí)行完成再終止。
void ZERO_ThreadPool::stop()
{
{
std::unique_lock<std::mutex> lock(mutex_);
bTerminate_ = true;
condition_.notify_all();
}
waitForAllDone();
for (size_t i = 0; i < threads_.size(); i++)
{
if (threads_[i]->joinable())
{
threads_[i]->join();
}
delete threads_[i];
threads_[i] = NULL;
}
std::unique_lock<std::mutex> lock(mutex_);
threads_.clear();
}
通過join等線程執(zhí)行完成后才返回。
主函數(shù)調(diào)用
class Test
{
public:
int test(int i) {
cout << _name << ", i = " << i << endl;
Sleep(1000);
return i;
}
void setName(string name) {
_name = name;
}
string _name;
};
void test3() // 測(cè)試類對(duì)象函數(shù)的綁定
{
ZERO_ThreadPool threadpool;
threadpool.init(2);
threadpool.start(); // 啟動(dòng)線程池
Test t1;
Test t2;
t1.setName("Test1");
t2.setName("Test2");
auto f1 = threadpool.exec(std::bind(&Test::test, &t1, std::placeholders::_1), 10);
auto f2 = threadpool.exec(std::bind(&Test::test, &t2, std::placeholders::_1), 20);
cout << "t1 " << f1.get() << endl;
cout << "t2 " << f2.get() << endl;
threadpool.stop();
}
int main()
{
test3(); // 測(cè)試類對(duì)象函數(shù)的綁定
cout << "main finish!" << endl;
return 0;
}
運(yùn)行結(jié)果:文章來源:http://www.zghlxwxcb.cn/news/detail-635683.html
本項(xiàng)目完整代碼下載地址基于C++11的線程池文章來源地址http://www.zghlxwxcb.cn/news/detail-635683.html
到了這里,關(guān)于線程池-手寫線程池C++11版本(生產(chǎn)者-消費(fèi)者模型)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!