国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

線程池-手寫線程池C++11版本(生產(chǎn)者-消費(fèi)者模型)

這篇具有很好參考價(jià)值的文章主要介紹了線程池-手寫線程池C++11版本(生產(chǎn)者-消費(fèi)者模型)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

本項(xiàng)目是基于C++11的線程池。使用了許多C++的新特性,包含不限于模板函數(shù)泛型編程、std::future、std::packaged_task、std::bind、std::forward完美轉(zhuǎn)發(fā)、std::make_shared智能指針、decltype類型推斷、std::unique_lock鎖等C++11新特性功能。


本項(xiàng)目有一定的上手難度。推薦參考系列文章

C++11實(shí)用技術(shù)(一)auto與decltype的使用

C++11實(shí)用技術(shù)(二)std::function和bind綁定器

C++11實(shí)用技術(shù)(三)std::future、std::promise、std::packaged_task、async

C++ operator關(guān)鍵字的使用(重載運(yùn)算符、仿函數(shù)、類型轉(zhuǎn)換操作符)

右值引用以及move移動(dòng)語義和forward 完美轉(zhuǎn)發(fā)

C++標(biāo)準(zhǔn)庫中的鎖lock_guard、unique_lock、shared_lock、scoped_lock、recursive_mutex


代碼結(jié)構(gòu)

本項(xiàng)目線程池功能分以下幾個(gè)函數(shù)去實(shí)現(xiàn):

threadpool.init(isize_t num);設(shè)置線程的數(shù)量
threadpool::get(TaskFuncPtr& task);讀取任務(wù)隊(duì)列中的任務(wù)
threadpool::run();通過get()讀取任務(wù)并執(zhí)行
threadpool.start(); 啟動(dòng)線程池,并通過run()執(zhí)行任務(wù)
threadpool.exec();封裝任務(wù)到任務(wù)隊(duì)列中
threadpool.waitForAllDone();等待所有任務(wù)執(zhí)行完成
threadpool.stop();分離線程,釋放內(nèi)存

threadpool.init

init的功能是初始化線程池,主要是設(shè)置線程的數(shù)量到類的成員變量中。

bool ZERO_ThreadPool::init(size_t num)
{
	std::unique_lock<std::mutex> lock(mutex_);

	if (!threads_.empty())
	{
		return false;
	}

	threadNum_ = num;
	return true;
}

threadNum_:保存線程的數(shù)量,在init函數(shù)中被賦值

此處使用unique_lock或lock_guard的加鎖方式都能實(shí)現(xiàn)自動(dòng)加鎖和解鎖。但是unique_lock可以進(jìn)行臨時(shí)解鎖和再上鎖,而lock_guard不行,特殊情況下還是必須使用unique_lock(用到條件變量的情況)。(lock_guard比較簡單,相對(duì)來說性能要好一點(diǎn))

threadpool::get

從任務(wù)隊(duì)列中獲取獲取任務(wù),這里其實(shí)就是我們的消費(fèi)者模塊。

bool ZERO_ThreadPool::get(TaskFuncPtr& task)
{
	std::unique_lock<std::mutex> lock(mutex_);
	if (tasks_.empty()) //判斷任務(wù)是否存在
	{
		//要終止線程池   bTerminate_設(shè)置為true,任務(wù)隊(duì)列不為空
		condition_.wait(lock, [this] { return bTerminate_ || !tasks_.empty(); });
	}
	if (bTerminate_)
		return false;
	if (!tasks_.empty())
	{
		task = std::move(tasks_.front());  // 使用了移動(dòng)語義
		tasks_.pop(); //釋放資源,釋放一個(gè)任務(wù)
		return true;
	}
	return false;
}
  • 條件變量condition_.wait(lock, [this] { return bTerminate_ || !tasks_.empty(); });是需要一直等待條件完成才退出。即任務(wù)終止,或者任務(wù)隊(duì)列不為空時(shí),就會(huì)退出條件變量的阻塞狀態(tài),繼續(xù)執(zhí)行下面的邏輯。

  • task = std::move(tasks_.front()); 使用了移動(dòng)語義,將 tasks_.front() 的內(nèi)容移動(dòng)到了 task 中??梢詼p少內(nèi)容拷貝。移動(dòng)完之后tasks_.front() 的內(nèi)容會(huì)變?yōu)槲粗付ǖ臓顟B(tài),所以直接pop掉就好了。

threadpool::run

這里是運(yùn)行我們的任務(wù)部分。包括調(diào)用get在任務(wù)隊(duì)列中獲取任務(wù),以及執(zhí)行任務(wù)。

void ZERO_ThreadPool::run()  // 執(zhí)行任務(wù)的線程
{
	//調(diào)用處理部分
	while (!isTerminate()) // 判斷是不是要停止
	{
		TaskFuncPtr task;
		bool ok = get(task);        // 讀取任務(wù)
		if (ok)
		{
			++atomic_;
			try
			{
				if (task->_expireTime != 0 && task->_expireTime < TNOWMS)
				{//如果設(shè)置了超時(shí),并且超時(shí)了,就需要執(zhí)行本邏輯
				//超時(shí)任務(wù),本代碼未實(shí)現(xiàn),有需要可實(shí)現(xiàn)在此處
				}
				else
				{
					task->_func();  // 執(zhí)行任務(wù)
				}
			}
			catch (...)
			{
			}
			--atomic_;
			}
		}
	}
}

atomic_:運(yùn)行一個(gè)任務(wù),該參數(shù)+1;執(zhí)行完畢,該參數(shù)-1。這里是為了待會(huì)停止線程池時(shí)判斷是否還有運(yùn)行中的任務(wù)(未完成的線程)。

threadpool.start

創(chuàng)建線程,并把線程池存入vector中,后面釋放線程池時(shí),好一一釋放線程。

bool ZERO_ThreadPool::start()
{
	std::unique_lock<std::mutex> lock(mutex_);
	if (!threads_.empty())
	{
		return false;
	}
	for (size_t i = 0; i < threadNum_; i++)
	{
		threads_.push_back(new thread(&ZERO_ThreadPool::run, this));
	}
	return true;
}

threads_.push_back(new thread(&ZERO_ThreadPool::run, this));創(chuàng)建線程,線程的回調(diào)函數(shù)為run。

threadpool.exec

exec是將我們的任務(wù)存入任務(wù)隊(duì)列中,這段代碼是本項(xiàng)目最難的,用了很多C++的新特性。

/*
	template <class F, class... Args>
	它是c++里新增的最強(qiáng)大的特性之一,它對(duì)參數(shù)進(jìn)行了高度泛化,它能表示0到任意個(gè)數(shù)、任意類型的參數(shù)
	auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))>
	std::future<decltype(f(args...))>:返回future,調(diào)用者可以通過future獲取返回值
	返回值后置
	*/
	template <class F, class... Args>
	auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>//接受一個(gè)超時(shí)時(shí)間 `timeoutMs`,一個(gè)可調(diào)用對(duì)象 `f` 和其它參數(shù) `args...`,并返回一個(gè) `std::future` 對(duì)象,該對(duì)象可以用于獲取任務(wù)執(zhí)行的結(jié)果。
	{
		int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs);  // 根據(jù)超時(shí)時(shí)間計(jì)算任務(wù)的過期時(shí)間 `expireTime`,如果超時(shí)時(shí)間為 0,則任務(wù)不會(huì)過期。
		//定義返回值類型
		using RetType = decltype(f(args...));  // 使用 `decltype` 推導(dǎo)出 `f(args...)` 的返回值類型,并將其定義為 `RetType`(這里的using和typedef功能一樣,就是為一個(gè)類型起一個(gè)別名)。
		// 封裝任務(wù) 使用 `std::packaged_task` 將可調(diào)用對(duì)象 `f` 和其它參數(shù) `args...` 封裝成一個(gè)可執(zhí)行的函數(shù),并將其存儲(chǔ)在一個(gè) `std::shared_ptr` 對(duì)象 `task` 中。
		auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));

		TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);  // 封裝任務(wù)指針,設(shè)置過期時(shí)間 創(chuàng)建一個(gè) `TaskFunc` 對(duì)象,并將任務(wù)的過期時(shí)間 `expireTime` 傳遞給它。
		fPtr->_func = [task]() {  // 具體執(zhí)行的函數(shù) 將封裝好的任務(wù)函數(shù)存儲(chǔ)在 `TaskFunc` 對(duì)象的 `_func` 成員中,該函數(shù)會(huì)在任務(wù)執(zhí)行時(shí)被調(diào)用。
			(*task)();
		};

		std::unique_lock<std::mutex> lock(mutex_);
		tasks_.push(fPtr);              // 將任務(wù)插入任務(wù)隊(duì)列中
		condition_.notify_one();        // 喚醒阻塞的線程,可以考慮只有任務(wù)隊(duì)列為空的情況再去notify

		return task->get_future();; //返回一個(gè) `std::future` 對(duì)象,該對(duì)象可以用于獲取任務(wù)執(zhí)行的結(jié)果。
	}

使用了可變參數(shù)模板函數(shù)。
tasks_:保存任務(wù)的隊(duì)列
condition_.notify_one():保存一個(gè)任務(wù)喚醒一個(gè)條件變量
std::future : 異步指向某個(gè)任務(wù),然后通過future特性去獲取任務(wù)函數(shù)的返回結(jié)果。
std::bind:將參數(shù)列表和函數(shù)綁定,生成一個(gè)新的可調(diào)用對(duì)象
std::packaged_task:將任務(wù)和feature綁定在一起的模板,是一種封裝對(duì)任務(wù)的封裝。

本函數(shù)用到了泛型編程模板函數(shù),輸入?yún)?shù)有3個(gè):一個(gè)超時(shí)時(shí)間 timeoutMs,一個(gè)可調(diào)用對(duì)象 f 和參數(shù) args...。采用返回值后置的方式返回一個(gè)std::future對(duì)象。這里采用返回值后置是為了方便使用decltype(f(args…)推導(dǎo)數(shù)據(jù)類型。

auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward(f), std::forward(args)…));是將我們傳進(jìn)來的任務(wù)函數(shù)和參數(shù)bind成一個(gè)對(duì)象,這個(gè)對(duì)象可以看作是一整個(gè)函數(shù),其返回值就是RetType 類型,并且沒有輸入?yún)?shù)。所以用std::packaged_task<RetType()>這樣的格式來打包封裝。封裝好的對(duì)象用智能指針(std::make_shared)來管理。

同時(shí)還要?jiǎng)?chuàng)建一個(gè)TaskFunc的對(duì)象,同樣用智能指針管理,這個(gè)對(duì)象包括兩項(xiàng)內(nèi)容,一個(gè)就是超時(shí)時(shí)間,一個(gè)就是我們封裝好的task對(duì)象。
通過TaskFuncPtr fPtr = std::make_shared(expireTime);和fPtr->_func = task {(*task)();};兩條代碼將這兩項(xiàng)傳進(jìn)去。

最后會(huì)通過task->get_future()返回我們?nèi)蝿?wù)函數(shù)執(zhí)行的結(jié)果返回值。

threadpool.waitForAllDone

等待所有任務(wù)執(zhí)行完成。

bool ZERO_ThreadPool::waitForAllDone(int millsecond)
{
	std::unique_lock<std::mutex> lock(mutex_);
	if (tasks_.empty() && atomic_ == 0)
		return true;
	if (millsecond < 0)
	{
		condition_.wait(lock, [this] { return tasks_.empty() && atomic_ == 0; });
		return true;
	}
	else
	{
		return condition_.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return tasks_.empty() && atomic_ == 0; });
	}
}

使用條件變量來等待任務(wù)執(zhí)行完成。支持超時(shí)執(zhí)行功能。

此處unique_lock的使用是必須的: 條件變量condition_在wait時(shí)會(huì)進(jìn)行unlock再進(jìn)入休眠, lock_guard并無該操作接口

threadpool.stop

終止線程池。會(huì)調(diào)用waitForAllDone等待所有任務(wù)執(zhí)行完成再終止。

void ZERO_ThreadPool::stop()
{
	{
		std::unique_lock<std::mutex> lock(mutex_);
		bTerminate_ = true;
		condition_.notify_all();
	}
	waitForAllDone();
	for (size_t i = 0; i < threads_.size(); i++)
	{
		if (threads_[i]->joinable())
		{
			threads_[i]->join();
		}
		delete threads_[i];
		threads_[i] = NULL;
	}
	std::unique_lock<std::mutex> lock(mutex_);
	threads_.clear();
}

通過join等線程執(zhí)行完成后才返回。

主函數(shù)調(diào)用

class Test
{
public:
	int test(int i) {
		cout << _name << ", i = " << i << endl;
		Sleep(1000);
		return i;
	}
	void setName(string name) {
		_name = name;
	}
	string _name;
};
void test3() // 測(cè)試類對(duì)象函數(shù)的綁定
{
	ZERO_ThreadPool threadpool;
	threadpool.init(2);
	threadpool.start(); // 啟動(dòng)線程池
	Test t1;
	Test t2;
	t1.setName("Test1");
	t2.setName("Test2");
	auto f1 = threadpool.exec(std::bind(&Test::test, &t1, std::placeholders::_1), 10);
	auto f2 = threadpool.exec(std::bind(&Test::test, &t2, std::placeholders::_1), 20);
	cout << "t1 " << f1.get() << endl;
	cout << "t2 " << f2.get() << endl;
	threadpool.stop();
}
int main()
{
	test3(); // 測(cè)試類對(duì)象函數(shù)的綁定
	cout << "main finish!" << endl;
	return 0;
}

運(yùn)行結(jié)果:
線程池-手寫線程池C++11版本(生產(chǎn)者-消費(fèi)者模型),C++進(jìn)階,c++,java,開發(fā)語言

本項(xiàng)目完整代碼下載地址基于C++11的線程池文章來源地址http://www.zghlxwxcb.cn/news/detail-635683.html

到了這里,關(guān)于線程池-手寫線程池C++11版本(生產(chǎn)者-消費(fèi)者模型)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 線程同步--生產(chǎn)者消費(fèi)者模型

    線程同步--生產(chǎn)者消費(fèi)者模型

    條件變量是 線程間共享的全局變量 ,線程間可以通過條件變量進(jìn)行同步控制 條件變量的使用必須依賴于互斥鎖以確保線程安全,線程申請(qǐng)了互斥鎖后,可以調(diào)用特定函數(shù) 進(jìn)入條件變量等待隊(duì)列(同時(shí)釋放互斥鎖) ,其他線程則可以通過條件變量在特定的條件下喚醒該線程( 喚醒后線

    2024年01月19日
    瀏覽(26)
  • 【Linux】線程安全-生產(chǎn)者消費(fèi)者模型

    【Linux】線程安全-生產(chǎn)者消費(fèi)者模型

    1個(gè)線程安全的隊(duì)列:只要保證先進(jìn)先出特性的數(shù)據(jù)結(jié)構(gòu)都可以稱為隊(duì)列 這個(gè)隊(duì)列要保證互斥(就是保證當(dāng)前只有一個(gè)線程對(duì)隊(duì)列進(jìn)行操作,其他線程不可以同時(shí)來操作),還要保證同步,當(dāng)生產(chǎn)者將隊(duì)列中填充滿了之后要通知消費(fèi)者來進(jìn)行消費(fèi),消費(fèi)者消費(fèi)之后通知生產(chǎn)者

    2024年02月10日
    瀏覽(25)
  • 線程同步--生產(chǎn)者消費(fèi)者模型--單例模式線程池

    線程同步--生產(chǎn)者消費(fèi)者模型--單例模式線程池

    條件變量是 線程間共享的全局變量 ,線程間可以通過條件變量進(jìn)行同步控制 條件變量的使用必須依賴于互斥鎖以確保線程安全,線程申請(qǐng)了互斥鎖后,可以調(diào)用特定函數(shù) 進(jìn)入條件變量等待隊(duì)列(同時(shí)釋放互斥鎖) ,其他線程則可以通過條件變量在特定的條件下喚醒該線程( 喚醒后線

    2024年01月20日
    瀏覽(22)
  • C#多線程學(xué)習(xí)(三) 生產(chǎn)者和消費(fèi)者

    線程學(xué)習(xí)第一篇: C#多線程學(xué)習(xí)(一) 多線程的相關(guān)概念 線程學(xué)習(xí)第二篇: C#多線程學(xué)習(xí)(二) 如何操縱一個(gè)線程 前面說過,每個(gè)線程都有自己的資源,但是代碼區(qū)是共享的,即每個(gè)線程都可以執(zhí)行相同的函數(shù)。這可能帶來的問題就是幾個(gè)線程同時(shí)執(zhí)行一個(gè)函數(shù),導(dǎo)致數(shù)據(jù)的混

    2023年04月21日
    瀏覽(22)
  • python爬蟲,多線程與生產(chǎn)者消費(fèi)者模式

    使用隊(duì)列完成生產(chǎn)者消費(fèi)者模式 使用類創(chuàng)建多線程提高爬蟲速度 通過隊(duì)列可以讓線程之間進(jìn)行通信 創(chuàng)建繼承Thread的類創(chuàng)建線程,run()會(huì)在線程start時(shí)執(zhí)行 吃cpu性能

    2024年02月09日
    瀏覽(18)
  • 探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全

    探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全

    目錄 1. 多線程安全 1.1. 生產(chǎn)者是多線程安全的么? 1.1. 消費(fèi)者是多線程安全的么? 2. 消費(fèi)者規(guī)避多線程安全方案 2.1. 每個(gè)線程維護(hù)一個(gè)kafkaConsumer 2.2. [單/多]kafkaConsumer實(shí)例 + 多worker線程 2.3.方案優(yōu)缺點(diǎn)對(duì)比 ????????Kafka生產(chǎn)者是 線程安全 的,可以在多個(gè)線程中共享一個(gè)

    2023年04月26日
    瀏覽(24)
  • 多線程(初階七:阻塞隊(duì)列和生產(chǎn)者消費(fèi)者模型)

    多線程(初階七:阻塞隊(duì)列和生產(chǎn)者消費(fèi)者模型)

    目錄 一、阻塞隊(duì)列的簡單介紹 二、生產(chǎn)者消費(fèi)者模型 1、舉個(gè)栗子: 2、引入生產(chǎn)者消費(fèi)者模型的意義: (1)解耦合 (2)削峰填谷 三、模擬實(shí)現(xiàn)阻塞隊(duì)列 1、阻塞隊(duì)列的簡單介紹 2、實(shí)現(xiàn)阻塞隊(duì)列 (1)實(shí)現(xiàn)普通隊(duì)列 (2)加上線程安全 (3)加上阻塞功能 3、運(yùn)用阻塞隊(duì)列

    2024年02月05日
    瀏覽(20)
  • JavaEE 初階篇-生產(chǎn)者與消費(fèi)者模型(線程通信)

    JavaEE 初階篇-生產(chǎn)者與消費(fèi)者模型(線程通信)

    ??博客主頁:?【 小扳_-CSDN博客】 ?感謝大家點(diǎn)贊??收藏?評(píng)論? ? 文章目錄 ? ? ? ? 1.0 生產(chǎn)者與消費(fèi)者模型概述 ? ? ? ? 2.0?在生產(chǎn)者與消費(fèi)者模型中涉及的關(guān)鍵概念 ? ? ? ? 2.1 緩沖區(qū) ? ? ? ? 2.2 生產(chǎn)者 ? ? ? ? 2.3 消費(fèi)者 ? ? ? ? 2.4 同步機(jī)制 ? ? ? ? 2.5 線程間通

    2024年04月28日
    瀏覽(31)
  • C# 快速寫入日志 不卡線程 生產(chǎn)者 消費(fèi)者模式

    C# 快速寫入日志 不卡線程 生產(chǎn)者 消費(fèi)者模式

    有這樣一種場景需求,就是某個(gè)方法,對(duì)耗時(shí)要求很高,但是又要記錄日志到數(shù)據(jù)庫便于分析,由于訪問數(shù)據(jù)庫基本都要幾十毫秒,可在方法里寫入BlockingCollection,由另外的線程寫入數(shù)據(jù)庫。 可以看到,在我的機(jī)子上面,1ms寫入了43條日志。

    2024年02月15日
    瀏覽(23)
  • 【linux】線程同步+基于BlockingQueue的生產(chǎn)者消費(fèi)者模型

    【linux】線程同步+基于BlockingQueue的生產(chǎn)者消費(fèi)者模型

    喜歡的點(diǎn)贊,收藏,關(guān)注一下把! 在線程互斥寫了一份搶票的代碼,我們發(fā)現(xiàn)雖然加鎖解決了搶到負(fù)數(shù)票的問題,但是一直都是一個(gè)線程在搶票,它錯(cuò)了嗎,它沒錯(cuò)但是不合理。那我們應(yīng)該如何安全合理的搶票呢? 講個(gè)小故事。 假設(shè)學(xué)校有一個(gè)VIP學(xué)霸自習(xí)室,這個(gè)自習(xí)室有

    2024年02月03日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包