????????本篇文章我們使用C++探討一下生產(chǎn)者與消費(fèi)者問題.?
1. 多線程的引入
? ? ? ? 我們學(xué)習(xí)了操作系統(tǒng), 知道了進(jìn)程和線程的概念, 但是如果不進(jìn)行代碼實戰(zhàn)的話, 會很難理解它們. 特別是編程的初學(xué)者(比如我), 在了解了進(jìn)程和線程后通常會感到疑惑: 多線程怎么用? 為啥我平時寫代碼沒有使用到多線程呢?
????????首先我們先看多進(jìn)程, 這個比較好理解, 比如我們新建一個C++項目, 這個項目在運(yùn)行的時候就是一個進(jìn)程, 它和計算機(jī)上面運(yùn)行的其它程序一起工作, 這就是多進(jìn)程. 至于為什么我們在寫代碼的時候, 不需要考慮多進(jìn)程之間的資源分配情況, 我們可以認(rèn)為是操作系統(tǒng)幫我們做了這些事情, 所以我們寫代碼時可以只考慮我們自己寫的代碼邏輯.
????????我們再看多線程, 先回顧一下C++的最基本的一段代碼:
#include <iostream>
int main()
{
std::cout << "Hello World!\n";
}
代碼1: 最基本的C++代碼
????????這段代碼是多線程嗎, 很明顯它不是的, 它是一段單線程的代碼. 寫代碼時我們?nèi)绻麤]有開啟新的線程, 那這段代碼就只會是單線程的代碼. 所以如果我們想要進(jìn)行多線程編程, 就需要用代碼開啟新的線程. 怎么用代碼開啟新的線程呢? 如下所示.
#include <iostream>
#include <thread>
void function_name() {
// 線程函數(shù)的代碼
std::cout << "Hello World!\n";
}
int main() {
// 創(chuàng)建新線程并啟動
std::thread t(function_name);
// 等待新線程結(jié)束
t.join();
return 0;
}
代碼2: 開啟一個線程
????????如上所示, 我們開啟了一個新的線程, 將"Hello World!"的輸出放在了這個線程內(nèi)執(zhí)行, 但這也只是開啟了一個線程, 還不算是嚴(yán)格意義上的多線程. 至少開啟兩個線程才算得上是多線程.?
2. 生產(chǎn)者與消費(fèi)者問題引入
? ? ? ? 現(xiàn)在我們知道了如何開啟新的線程, 就可以介紹一下生產(chǎn)者與消費(fèi)者問題了. 問題很簡單: 有一個容納產(chǎn)品的池子, 生產(chǎn)者不停地生產(chǎn)產(chǎn)品放入池子, 消費(fèi)者不停地從池子拿走產(chǎn)品進(jìn)行消費(fèi). 我們先考慮最簡單的情況: 先不考慮消費(fèi)者, 只考慮有兩個生產(chǎn)者, 而且我們目前只關(guān)心產(chǎn)品的數(shù)量, 不考慮池子的容量. 用itemCount來表示產(chǎn)品的數(shù)量, 代碼該如何寫呢?
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 產(chǎn)品的數(shù)量, 初始值為0
/// </summary>
int itemCount = 0;
/// <summary>
/// 生產(chǎn)者函數(shù)1
/// </summary>
void producer_fun1() {
while (true)
{
printf("生產(chǎn)者1生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1)); //當(dāng)前線程阻塞1秒
}
}
/// <summary>
/// 生產(chǎn)者函數(shù)2
/// </summary>
void producer_fun2() {
while (true)
{
printf("生產(chǎn)者2生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1)); //當(dāng)前線程阻塞1秒
}
}
int main()
{
std::thread producer_thread1(producer_fun1); //生產(chǎn)者線程1
std::thread producer_thread2(producer_fun2); //生產(chǎn)者線程2
producer_thread1.join(); //等待線程結(jié)束
producer_thread1.join();
return 0;
}
? ? ? ?代碼3: 兩個生產(chǎn)者(沒有對線程操作進(jìn)行任何限制)
????????我們看上面的代碼, 似乎沒有什么問題, 開啟了兩個生產(chǎn)者的線程, 讓它們不停地生產(chǎn)產(chǎn)品, 而且我們還讓它們在生產(chǎn)完產(chǎn)品之后"休息"一秒鐘, 很人性化, 但這段代碼的運(yùn)行結(jié)果是我們想要的嗎? 運(yùn)行一下看看, 結(jié)果如下:?
? ? ? ? ?運(yùn)行結(jié)果明顯不盡人意, 我們希望的是每個生產(chǎn)者生產(chǎn)產(chǎn)品之后, 產(chǎn)品數(shù)量能夠有序遞增, 但現(xiàn)在情況明顯不是這樣, 甚至讀者把這段代碼自己運(yùn)行一下, 運(yùn)行結(jié)果也很可能與我的運(yùn)行結(jié)果不一樣, 怎么回事呢? 讀者可以把這段代碼的兩個打印輸出的地方打個斷點, 會發(fā)現(xiàn)代碼不會老老實實地執(zhí)行完一個線程函數(shù)再去執(zhí)行另外一個線程函數(shù), 很詭異的是, 它在一個線程函數(shù)還沒有執(zhí)行完的時候, 轉(zhuǎn)而去執(zhí)行另外一個線程函數(shù)了, 這就是多線程(進(jìn)程)編程有意思的地方, 多線程(進(jìn)程)的存在, 或者說程序的并發(fā)執(zhí)行, 使得程序有了間斷性, 失去封閉性, 失去可再現(xiàn)性.?
? ? ? ? 好了, 我們還是先別考慮更復(fù)雜的情況了, 僅僅是兩個生產(chǎn)者的線程, 就出現(xiàn)了這么大的bug, 還是先想想怎么解決這個bug才能繼續(xù).?
3. 互斥問題引入
? ? ? ? 問題的根源在于, 代碼的執(zhí)行順序被打亂了, 導(dǎo)致了itemCount沒有依次遞增, 你可能會有疑問: 不對呀, 盡管代碼的執(zhí)行順序被打亂了, 但是我們也只是打印輸出了itemCount的值而已, 就算打亂了代碼每一行的執(zhí)行順序, 你總得把這一行代碼執(zhí)行完再去執(zhí)行別得行的代碼吧, 現(xiàn)在卻出現(xiàn)了亂序, 難道說printf這一行代碼還沒執(zhí)行完就跑去執(zhí)行其它行的代碼了嗎? 說得沒錯. 如果僅僅是打印輸出itemCount的值, 那是不會出現(xiàn)問題的, 問題就出現(xiàn)在這個++itemCount. 它看上去是一句代碼, 其實CPU在執(zhí)行的時候, 是分成多個步驟執(zhí)行的, 也即++itemCount并不是原子操作, 原子操作就是不可細(xì)分的操作, 只有原子操作的執(zhí)行在多線程(進(jìn)程)中不會被打亂, 這個讀者可以另行了解. 而打亂導(dǎo)致的結(jié)果就是兩個生產(chǎn)者線程爭相對itemCount進(jìn)行自增操作, 從而出現(xiàn)了亂序的情況.?很可惜, 既然++itemCount不是原子操作, 我們就只能想辦法, 讓它像原子操作那樣不被打亂. 要想這個操作不被打亂 就需要對線程進(jìn)行一些限制, 不能讓它們在兩個線程直接隨意切換執(zhí)行, 至少要完成地把printf語句執(zhí)行完才能切換線程.?
? ? ? ? 這是第一個難點, 怎么才能保證完整執(zhí)行完++itemCount這個操作呢? 我們可以想象這樣一個場景: 把itemCount鎖在一個屋子里, 屋子的鎖只有一把鑰匙, 最初鎖與鑰匙都在門外, 每當(dāng)有人要進(jìn)入屋子對itemCount進(jìn)行操作, 就用這把鑰匙開門, 然后鎖和鑰匙自己保管帶入屋子, 然后在屋子里將門反鎖, 對itemCount進(jìn)行操作之后, 就可以出門了, 然后歸還鎖和鑰匙. 這樣就可以保證自己在操作itemCount時沒有其它人進(jìn)入屋子, 也即++itemCount這個操作不會被打斷. 這就是一個互斥操作.?
? ? ? ? 怎么用C++去創(chuàng)建這個鎖和鑰匙呢? 我們可以用mutex來創(chuàng)建. 代碼如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 產(chǎn)品的數(shù)量, 初始值為0
/// </summary>
int itemCount = 0;
std::mutex mtx; //互斥量, 即鎖和鑰匙
/// <summary>
/// 生產(chǎn)者函數(shù)1
/// </summary>
void producer_fun1() {
while (true)
{
mtx.lock(); //進(jìn)入屋子, 上鎖
printf("生產(chǎn)者1生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1)); //當(dāng)前線程阻塞1秒
mtx.unlock(); //從屋子出去, 解鎖, 歸還鎖和鑰匙
}
}
/// <summary>
/// 生產(chǎn)者函數(shù)2
/// </summary>
void producer_fun2() {
while (true)
{
mtx.lock(); //進(jìn)入屋子, 上鎖
printf("生產(chǎn)者2生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1)); //當(dāng)前線程阻塞1秒
mtx.unlock(); //從屋子出去, 解鎖, 歸還鎖和鑰匙
}
}
int main()
{
std::thread producer_thread1(producer_fun1); //生產(chǎn)者線程1
std::thread producer_thread2(producer_fun2); //生產(chǎn)者線程2
producer_thread1.join(); //等待線程結(jié)束
producer_thread2.join();
return 0;
}
代碼4: 兩個生產(chǎn)者(加了鎖, 以解決互斥問題)
? ? ? ? 運(yùn)行一下看看:
?????????可以看出, 我們的產(chǎn)品數(shù)量是按序遞增的, 因此互斥問題得到了解決. 你可能會問, 為啥生產(chǎn)者2"進(jìn)入屋子"這么多次, 而生產(chǎn)者1"進(jìn)入屋子"的次數(shù)這么少? 這說明生產(chǎn)者1遲遲得不到對itemCount的操作權(quán)限, 產(chǎn)生了饑餓現(xiàn)象, 這個不是我們目前研究的重點, 后面再討論. 我們最主要的目的--解決互斥問題--已經(jīng)實現(xiàn)了.?
? ? ? ? 再看一下一個生產(chǎn)者和一個消費(fèi)者的情況:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 產(chǎn)品的數(shù)量, 初始值為0
/// </summary>
int itemCount = 0;
std::mutex mtx; //互斥量, 即鎖和鑰匙
/// <summary>
/// 生產(chǎn)者函數(shù)
/// </summary>
void producer_fun() {
while (true)
{
mtx.lock(); //進(jìn)屋, 上鎖
printf("生產(chǎn)者生產(chǎn)產(chǎn)品: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock(); //出去, 解鎖
}
}
/// <summary>
/// 消費(fèi)者函數(shù)
/// </summary>
void consumer_fun() {
while (true)
{
mtx.lock(); //進(jìn)屋, 上鎖
if (itemCount > 0)
{
printf("消費(fèi)者消費(fèi)產(chǎn)品: %d\n", --itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
mtx.unlock(); //出去, 解鎖
}
}
int main()
{
std::thread producer_thread(producer_fun); // 創(chuàng)建一個新線程,傳入函數(shù)和參數(shù)
std::thread consumer_thread(consumer_fun);
producer_thread.join(); //等待線程結(jié)束
producer_thread.join();
return 0;
}
代碼5: 一個生產(chǎn)者, 一個消費(fèi)者(加了鎖, 以解決互斥問題)
????????這里我們限制itemCount大于0時才能消費(fèi)產(chǎn)品, 否則不合邏輯, 運(yùn)行結(jié)果如下:?
????????可以看出, 加鎖的方法同樣可以解決一個生產(chǎn)者和一個消費(fèi)者的互斥問題.
4. 同步問題引入
? ? ? ? 現(xiàn)在我們看看一個生產(chǎn)者, 兩個消費(fèi)者的情況, 我們規(guī)定產(chǎn)品池最多可以容納10個產(chǎn)品:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 產(chǎn)品的數(shù)量, 初始值為0
/// </summary>
int itemCount = 0;
/// <summary>
/// 產(chǎn)品池可以容納的產(chǎn)品數(shù)量的最大值
/// </summary>
const int MAX_VALUE = 10;
std::mutex mtx; //互斥量, 即鎖和鑰匙
/// <summary>
/// 生產(chǎn)者函數(shù)
/// </summary>
void producer_fun() {
while (true)
{
mtx.lock();
if (itemCount < MAX_VALUE)
{
printf("生產(chǎn)者 生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
mtx.unlock();
}
}
/// <summary>
/// 消費(fèi)者函數(shù)
/// </summary>
void consumer_fun(int index) {
while (true)
{
mtx.lock();
if (itemCount > 0)
{
printf("消費(fèi)者%d消費(fèi)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, --itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
mtx.unlock();
}
}
int main()
{
std::thread producer_thread(producer_fun); //生產(chǎn)者線程
std::thread consumer_thread1(consumer_fun, 1); //消費(fèi)者線程1
std::thread consumer_thread2(consumer_fun, 2); //消費(fèi)者線程2
producer_thread.join(); //等待線程結(jié)束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代碼6: 一個生產(chǎn)者, 兩個消費(fèi)者(只加了鎖, 以解決互斥問題)
????????這里我們?nèi)允褂胢utex來限制每個線程對itemCount的訪問, 運(yùn)行一下看看:
? ? ? ? ?可以看出, 我們的產(chǎn)品數(shù)量仍然可以按照生產(chǎn)者和消費(fèi)者的操作進(jìn)行數(shù)量的增加或者減少, 但是又有新的問題出現(xiàn)了: 消費(fèi)者中只有消費(fèi)者1在消費(fèi)產(chǎn)品, 消費(fèi)者2一個產(chǎn)品也沒有消費(fèi). 即消費(fèi)者2產(chǎn)生了饑餓現(xiàn)象. (注意, 這里的饑餓現(xiàn)象不一定百分之百發(fā)生, 具體要看操作系統(tǒng)的資源調(diào)度, 如下圖是同一段代碼的運(yùn)行結(jié)果)
? ? ? ? 這里我們?nèi)匀粫翰豢紤]饑餓問題, 根據(jù)輸出結(jié)果來看, 我們似乎很好地解決了生產(chǎn)者和消費(fèi)者問題, 雖然代碼6只展示了一個生產(chǎn)者和兩個消費(fèi)者的問題, 但多個生產(chǎn)者和多個消費(fèi)者只需要根據(jù)代碼6的格式多開啟幾個線程即可, 我們對產(chǎn)品生產(chǎn)和消費(fèi)操作進(jìn)行了很好的保護(hù), 使得它們不會被打斷. 那生產(chǎn)者和消費(fèi)者問題我們是不是就已經(jīng)解決了呢? 其實不然. 代碼6還存在優(yōu)化的空間.
? ? ? ? 哪個地方需要優(yōu)化? 以生產(chǎn)者函數(shù)為例, 在生產(chǎn)者進(jìn)入屋子以后, 它會判斷當(dāng)前產(chǎn)品的數(shù)量, 如果產(chǎn)品池已經(jīng)滿了, 不能容納更多的產(chǎn)品, 則生產(chǎn)者進(jìn)屋子以后就不再生產(chǎn)產(chǎn)品了; 同理, 消費(fèi)者進(jìn)入屋子以后, 如果屋子里面沒有產(chǎn)品, 則消費(fèi)者也不能消費(fèi). 問題就出現(xiàn)在這里. 生產(chǎn)者和消費(fèi)者都是進(jìn)了屋子以后才知道自己能否生產(chǎn)或者消費(fèi), 這可能導(dǎo)致生產(chǎn)者和消費(fèi)者進(jìn)了屋子以后什么也沒干, 這就導(dǎo)致了計算機(jī)性能的浪費(fèi). 此外, 事實上, 對itemCount的操作的代碼我們稱之為臨界區(qū), 臨界區(qū)的代碼應(yīng)該盡可能少. 怎么解決? 我們還是先回到一個生產(chǎn)者和一個消費(fèi)者的情況, 寫出代碼:?
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 產(chǎn)品的數(shù)量, 初始值為0
/// </summary>
int itemCount = 0;
/// <summary>
/// 產(chǎn)品池可以容納的產(chǎn)品數(shù)量的最大值
/// </summary>
const int MAX_VALUE = 10;
std::mutex mtx; //互斥量, 即鎖和鑰匙
/// <summary>
/// 生產(chǎn)者函數(shù)
/// </summary>
void producer_fun() {
while (true)
{
if (itemCount < MAX_VALUE)
{
mtx.lock();
printf("生產(chǎn)者 生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
}
}
}
/// <summary>
/// 消費(fèi)者函數(shù)
/// </summary>
void consumer_fun() {
while (true)
{
if (itemCount > 0)
{
mtx.lock();
printf("消費(fèi)者 消費(fèi)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", --itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
}
}
}
int main()
{
std::thread producer_thread(producer_fun); //生產(chǎn)者線程
std::thread consumer_thread(consumer_fun); //消費(fèi)者線程
producer_thread.join(); //等待線程結(jié)束
consumer_thread.join();
return 0;
}
代碼7: 一個生產(chǎn)者, 一個消費(fèi)者(加鎖之前先判斷產(chǎn)品數(shù)量)
? ? ? ? 運(yùn)行結(jié)果如下:
? ? ? ? ?看上去沒什么問題, 因為只有一個生產(chǎn)者和一個消費(fèi)者. 那一個生產(chǎn)者兩個消費(fèi)者的情況呢? 先寫一下代碼:?
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 產(chǎn)品的數(shù)量, 初始值為0
/// </summary>
int itemCount = 0;
/// <summary>
/// 產(chǎn)品池可以容納的產(chǎn)品數(shù)量的最大值
/// </summary>
const int MAX_VALUE = 10;
std::mutex mtx; //互斥量, 即鎖和鑰匙
/// <summary>
/// 生產(chǎn)者函數(shù)
/// </summary>
void producer_fun() {
while (true)
{
if (itemCount < MAX_VALUE)
{
mtx.lock();
printf("生產(chǎn)者 生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", ++itemCount);
//std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
}
}
}
/// <summary>
/// 消費(fèi)者函數(shù)
/// </summary>
void consumer_fun(int index) {
while (true)
{
if (itemCount > 0)
{ //花括號1
mtx.lock();
printf("消費(fèi)者%d消費(fèi)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, --itemCount);
//std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
}
}
}
int main()
{
std::thread producer_thread(producer_fun); //生產(chǎn)者線程
std::thread consumer_thread1(consumer_fun, 1); //消費(fèi)者線程1
std::thread consumer_thread2(consumer_fun, 2); //消費(fèi)者線程2
producer_thread.join(); //等待線程結(jié)束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代碼8: 一個生產(chǎn)者, 兩個消費(fèi)者(加鎖之前先判斷數(shù)量)
? ? ? ? 這里為了運(yùn)行得快一點, 我們?nèi)∠俗枞€程的操作, 然后看一下部分的運(yùn)行結(jié)果:
? ? ? ? ?結(jié)果出現(xiàn)了問題: 消費(fèi)者2在消費(fèi)產(chǎn)品時, 產(chǎn)品數(shù)量由0變?yōu)榱?1, 這顯然不符合邏輯. 為什么會出現(xiàn)這種情況? 思考這樣一種情況: 在代碼8里面, 若此時itemCount為1, 消費(fèi)者線程2執(zhí)行到了花括號1的位置, 然后發(fā)生調(diào)度, 轉(zhuǎn)而跳轉(zhuǎn)到了消費(fèi)者1的線程, 它也執(zhí)行到了花括號1的位置, 然后消費(fèi)者1順利地進(jìn)入屋子消費(fèi)產(chǎn)品, 此時itemCount為0; 然后再次發(fā)生了調(diào)度, 轉(zhuǎn)而去執(zhí)行消費(fèi)者2, 它繼續(xù)從花括號1的位置繼續(xù)執(zhí)行, 它也順利進(jìn)入屋子消費(fèi)產(chǎn)品, 則itemCount繼續(xù)自減1, 變?yōu)榱?1. 問題就出現(xiàn)在這里.
? ? ? ? 怎么辦, 要不我們退回到代碼6, 然后假裝無事發(fā)生? 不行, 我們探討過, 代碼6有優(yōu)化的空間. 因此我們需要解決當(dāng)前的問題. 問題出在哪? 問題在于對itemCount的判斷并沒有受到限制, itemCount >?0和itemCount < MAX_VALUE是進(jìn)入屋子的條件, 但消費(fèi)者1和消費(fèi)者2在進(jìn)屋子之前是沒有受到限制的, 當(dāng)滿足進(jìn)屋條件時, 它倆會爭相進(jìn)入屋子, 一點素質(zhì)也沒有, 結(jié)果就出現(xiàn)了問題. 我們不希望它倆爭搶, 我們要對同一類線程進(jìn)行管理, 要讓它們"排隊", 即搞一個線程管理的隊列.?只有在生產(chǎn)者生產(chǎn)完產(chǎn)品之后, 才會通知這個隊列, 讓這個隊列中的一個線程進(jìn)入屋子. 在C++中, 可以用condition_variable來管理這個線程隊列. 代碼如下:?
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 產(chǎn)品的數(shù)量, 初始值為0
/// </summary>
int itemCount = 0;
/// <summary>
/// 產(chǎn)品池可以容納的產(chǎn)品數(shù)量的最大值
/// </summary>
const int MAX_VALUE = 10;
std::mutex mtx; //用于訪問itemCount的互斥量, 即鎖和鑰匙
std::mutex consumer_mtx; //用于管理消費(fèi)者線程隊列的互斥量, 即鎖和鑰匙
std::condition_variable consumer_cv; //條件變量, 可以視為消費(fèi)者線程管理隊列
/// <summary>
/// 生產(chǎn)者函數(shù)
/// </summary>
void producer_fun() {
while (true)
{
if (itemCount < MAX_VALUE)
{
mtx.lock();
printf("生產(chǎn)者 生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
consumer_cv.notify_one(); //喚醒一個消費(fèi)者線程
}
}
}
/// <summary>
/// 消費(fèi)者函數(shù)
/// </summary>
void consumer_fun(int index) {
while (true)
{
if (itemCount <= 0)
{
std::unique_lock<std::mutex> lck(consumer_mtx);
consumer_cv.wait(lck); //如果產(chǎn)品數(shù)量<=0, 則阻塞當(dāng)前線程, 等待生產(chǎn)者發(fā)出喚醒的信號
}
if (itemCount > 0)
{ //花括號1
mtx.lock();
printf("消費(fèi)者%d消費(fèi)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, --itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
}
}
}
int main()
{
std::thread producer_thread(producer_fun); //生產(chǎn)者線程
std::thread consumer_thread1(consumer_fun, 1); //消費(fèi)者線程1
std::thread consumer_thread2(consumer_fun, 2); //消費(fèi)者線程2
producer_thread.join(); //等待線程結(jié)束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代碼9: 一個生產(chǎn)者, 兩個消費(fèi)者(引入條件變量)
????????如上所示, 先看消費(fèi)者的函數(shù), 在消費(fèi)者進(jìn)入屋子之前, 先判斷產(chǎn)品數(shù)量, 小于等于0則將消費(fèi)者線程阻塞. 等待生產(chǎn)者生產(chǎn)產(chǎn)品之后發(fā)出信號, 才能喚醒一個消費(fèi)者線程繼續(xù)執(zhí)行. 看上去似乎沒啥問題, 跑一下看看:?
? ? ? ? ?還是有問題, 怎么回事呢? 思考這樣一種情況: 還是看花括號1的位置, 最開始的時候, 消費(fèi)者線程一個也沒有被阻塞, 然后生產(chǎn)者生產(chǎn)了一個產(chǎn)品, 然后發(fā)生調(diào)度, 線程消費(fèi)者1開始執(zhí)行, 由于itemCount == 1, 因此消費(fèi)者1不會被阻塞, 執(zhí)行到了花括號1的位置; 此時再次發(fā)生調(diào)度, 轉(zhuǎn)而執(zhí)行消費(fèi)者2, 由于itemCount的值仍為1, 因此消費(fèi)者2也不會被阻塞, 它也執(zhí)行到了花括號1的位置. 此時繼續(xù)執(zhí)行消費(fèi)者1, 它進(jìn)屋了, itmeCount自減1值為0, 然后出來了;?再次調(diào)度執(zhí)行消費(fèi)者2, 它也可以進(jìn)屋, itmeCount繼續(xù)自減1值為-1. 這就是問題所在.?
? ? ? ? 解決辦法也很容易想到, 問題在于阻塞消費(fèi)者線程的時候, 我們是有條件的阻塞, 這樣看來不行, 這個阻塞應(yīng)該是無條件的, 也即只要消費(fèi)者想進(jìn)屋子, 就必須先阻塞自己, 也即先排隊, 等生產(chǎn)者發(fā)信號過來, 你才能考慮進(jìn)屋子的事情. 代碼修改如下:?
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 產(chǎn)品的數(shù)量, 初始值為0
/// </summary>
int itemCount = 0;
/// <summary>
/// 產(chǎn)品池可以容納的產(chǎn)品數(shù)量的最大值
/// </summary>
const int MAX_VALUE = 10;
std::mutex mtx; //用于訪問itemCount的互斥量, 即鎖和鑰匙
std::mutex consumer_mtx; //用于管理消費(fèi)者線程隊列的互斥量, 即鎖和鑰匙
std::condition_variable consumer_cv; //條件變量, 可以視為消費(fèi)者線程管理隊列
/// <summary>
/// 生產(chǎn)者函數(shù)
/// </summary>
void producer_fun() {
while (true)
{
if (itemCount < MAX_VALUE)
{
mtx.lock();
printf("生產(chǎn)者 生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
consumer_cv.notify_one(); //喚醒一個消費(fèi)者線程
}
}
}
/// <summary>
/// 消費(fèi)者函數(shù)
/// </summary>
void consumer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(consumer_mtx);
consumer_cv.wait(lck); //阻塞當(dāng)前線程, 等待生產(chǎn)者發(fā)出喚醒的信號
if (itemCount > 0)
{ //花括號1
mtx.lock();
printf("消費(fèi)者%d消費(fèi)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, --itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
}
}
}
int main()
{
std::thread producer_thread(producer_fun); //生產(chǎn)者線程
std::thread consumer_thread1(consumer_fun, 1); //消費(fèi)者線程1
std::thread consumer_thread2(consumer_fun, 2); //消費(fèi)者線程2
producer_thread.join(); //等待線程結(jié)束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代碼10: 一個生產(chǎn)者, 兩個消費(fèi)者(消費(fèi)者直接阻塞)
????????運(yùn)行結(jié)果如下:
? ? ? ? ?完美運(yùn)行, 沒有問題. 代碼10還可以怎么優(yōu)化? 我們可以將itemCount > 0這個條件作為喚醒消費(fèi)者線程的條件, 這樣就不用在喚醒消費(fèi)者線程之后再進(jìn)程一次判斷了, 代碼如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 產(chǎn)品的數(shù)量, 初始值為0
/// </summary>
int itemCount = 0;
/// <summary>
/// 產(chǎn)品池可以容納的產(chǎn)品數(shù)量的最大值
/// </summary>
const int MAX_VALUE = 10;
std::mutex mtx; //用于訪問itemCount的互斥量, 即鎖和鑰匙
std::mutex consumer_mtx; //用于管理消費(fèi)者線程隊列的互斥量, 即鎖和鑰匙
std::condition_variable consumer_cv; //條件變量, 可以視為消費(fèi)者線程管理隊列
/// <summary>
/// 生產(chǎn)者函數(shù)
/// </summary>
void producer_fun() {
while (true)
{
if (itemCount < MAX_VALUE)
{
mtx.lock();
printf("生產(chǎn)者 生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
consumer_cv.notify_one(); //喚醒一個消費(fèi)者線程
}
}
}
/// <summary>
/// 消費(fèi)者函數(shù)
/// </summary>
void consumer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(consumer_mtx);
consumer_cv.wait(lck, [] {return itemCount > 0; }); //阻塞當(dāng)前線程, 等待生產(chǎn)者發(fā)出喚醒的信號
mtx.lock();
printf("消費(fèi)者%d消費(fèi)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, --itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
}
}
int main()
{
std::thread producer_thread(producer_fun); //生產(chǎn)者線程
std::thread consumer_thread1(consumer_fun, 1); //消費(fèi)者線程1
std::thread consumer_thread2(consumer_fun, 2); //消費(fèi)者線程2
producer_thread.join(); //等待線程結(jié)束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
?代碼11: 一個生產(chǎn)者, 兩個消費(fèi)者(消費(fèi)者先阻塞, itemCount > 0時繼續(xù)執(zhí)行)
? ? ? ? 這段代碼也是沒有問題的, 讀者可以自行嘗試. 需要注意代碼9與代碼11的wait語句的區(qū)別, 看上似乎差不多, 但是其實是不一樣的, 代碼9可能不會阻塞消費(fèi)者線程, 但代碼11會先將消費(fèi)者線程阻塞掉,?itemCount > 0時繼續(xù)執(zhí)行.?
5. 生產(chǎn)者和消費(fèi)者問題
? ? ? ? 至此, 解決生產(chǎn)者和消費(fèi)者問題, 想必應(yīng)該是水到渠成的事了無非就是多個生產(chǎn)者和多個消費(fèi)者, 這里我們假設(shè)有兩個生產(chǎn)者和兩個消費(fèi)者. 我們已經(jīng)寫出了一個生產(chǎn)者和兩個消費(fèi)者的代碼, 兩個生產(chǎn)者和兩個消費(fèi)者的代碼如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 產(chǎn)品的數(shù)量, 初始值為0
/// </summary>
int itemCount = 0;
/// <summary>
/// 產(chǎn)品池可以容納的產(chǎn)品數(shù)量的最大值
/// </summary>
const int MAX_VALUE = 10;
std::mutex mtx; //用于訪問itemCount的互斥量, 即鎖和鑰匙
std::mutex consumer_mtx; //用于管理消費(fèi)者線程隊列的互斥量, 即鎖和鑰匙
std::mutex producer_mtx; //用于管理生產(chǎn)者線程隊列的互斥量, 即鎖和鑰匙
std::condition_variable consumer_cv; //條件變量, 可以視為消費(fèi)者線程管理隊列
std::condition_variable producer_cv; //條件變量, 可以視為生產(chǎn)者線程管理隊列
/// <summary>
/// 生產(chǎn)者函數(shù)
/// </summary>
void producer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(producer_mtx);
producer_cv.wait(lck, [] {return itemCount < MAX_VALUE; }); //阻塞當(dāng)前線程, 等待消費(fèi)者發(fā)出喚醒的信號
mtx.lock();
printf("生產(chǎn)者%d生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
consumer_cv.notify_one(); //喚醒一個消費(fèi)者線程
}
}
/// <summary>
/// 消費(fèi)者函數(shù)
/// </summary>
void consumer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(consumer_mtx);
consumer_cv.wait(lck, [] {return itemCount > 0; }); //阻塞當(dāng)前線程, 等待生產(chǎn)者發(fā)出喚醒的信號
mtx.lock();
printf("消費(fèi)者%d消費(fèi)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, --itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
producer_cv.notify_one(); //喚醒一個生產(chǎn)者線程
}
}
int main()
{
std::thread producer_thread1(producer_fun, 1); //生產(chǎn)者線程1
std::thread producer_thread2(producer_fun, 2); //生產(chǎn)者線程2
std::thread consumer_thread1(consumer_fun, 1); //消費(fèi)者線程1
std::thread consumer_thread2(consumer_fun, 2); //消費(fèi)者線程2
producer_thread1.join(); //等待線程結(jié)束
producer_thread2.join(); //等待線程結(jié)束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代碼12: 多個生產(chǎn)者, 多個消費(fèi)者
????????這段代碼也可以正常運(yùn)行, 讀者可以自行嘗試. 至此, 我們已經(jīng)解決了多個生產(chǎn)者和多個消費(fèi)者的同步與互斥問題. 如果你覺得這跟你在書上看到的P(), V()操作不太一樣的話, 別急, 我們可以對代碼稍作修改, 使它變成我們熟悉的樣子.?
? ? ? ? 首先我們需要用別的值來代替itemCount, 可以這樣思考: 消費(fèi)者消耗產(chǎn)品, 生產(chǎn)者消耗產(chǎn)品池中的產(chǎn)品位置. 用empty記錄空位置的個數(shù), 則empty初值為10; 用full記錄產(chǎn)品個數(shù), 則full初值為0. 代碼改寫如下:?
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 產(chǎn)品池可以容納的產(chǎn)品數(shù)量的最大值
/// </summary>
const int MAX_VALUE = 10;
/// <summary>
/// 產(chǎn)品的數(shù)量, 初始值為0
/// </summary>
int full = 0;
/// <summary>
/// 空位置的數(shù)量, 初始值為MAX_VALUE
/// </summary>
int empty = MAX_VALUE;
std::mutex mtx; //用于訪問itemCount的互斥量, 即鎖和鑰匙
std::mutex consumer_mtx; //用于管理消費(fèi)者線程隊列的互斥量, 即鎖和鑰匙
std::mutex producer_mtx; //用于管理生產(chǎn)者線程隊列的互斥量, 即鎖和鑰匙
std::condition_variable consumer_cv; //條件變量, 可以視為消費(fèi)者線程管理隊列
std::condition_variable producer_cv; //條件變量, 可以視為生產(chǎn)者線程管理隊列
/// <summary>
/// 生產(chǎn)者函數(shù)
/// </summary>
void producer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(producer_mtx);
producer_cv.wait(lck, [] {return empty > 0; }); //阻塞當(dāng)前線程, 等待消費(fèi)者發(fā)出喚醒的信號, 只要空位置個數(shù) > 0, 就喚醒一個生產(chǎn)者線程
mtx.lock();
--empty;
++full; //注意這里要同時改變full的值, 否則生產(chǎn)者生產(chǎn)完10個產(chǎn)品之后, 消費(fèi)者的喚醒條件full > 0始終不滿足, 程序就無法繼續(xù)執(zhí)行了
printf("生產(chǎn)者%d生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, MAX_VALUE - empty);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
consumer_cv.notify_one(); //喚醒一個消費(fèi)者線程
}
}
/// <summary>
/// 消費(fèi)者函數(shù)
/// </summary>
void consumer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(consumer_mtx);
consumer_cv.wait(lck, [] {return full > 0; }); //阻塞當(dāng)前線程, 等待生產(chǎn)者發(fā)出喚醒的信號, 只要產(chǎn)品個數(shù) > 0, 就喚醒一個消費(fèi)者線程
mtx.lock();
--full;
++empty; //同理, 這里也要同時改變empty的值
printf("消費(fèi)者%d消費(fèi)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, full);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
producer_cv.notify_one(); //喚醒一個生產(chǎn)者線程
}
}
int main()
{
std::thread producer_thread1(producer_fun, 1); //生產(chǎn)者線程1
std::thread producer_thread2(producer_fun, 2); //生產(chǎn)者線程2
std::thread consumer_thread1(consumer_fun, 1); //消費(fèi)者線程1
std::thread consumer_thread2(consumer_fun, 2); //消費(fèi)者線程2
producer_thread1.join(); //等待線程結(jié)束
producer_thread2.join(); //等待線程結(jié)束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代碼13: 多個生產(chǎn)者, 多個消費(fèi)者(empty記錄空位置個數(shù), full記錄產(chǎn)品個數(shù))
? ? ? ? 如上所示, 產(chǎn)品是消費(fèi)者消耗的資源, 空位置是生產(chǎn)者消耗的資源. 生產(chǎn)者和消費(fèi)者的代碼執(zhí)行之前先將自己阻塞, 等條件滿足才喚醒一個對應(yīng)的進(jìn)程:?empty > 0時喚醒一個生產(chǎn)者,?full > 0時喚醒一個消費(fèi)者. 我們再思考一下, 能不能把mtx.lock();這句代碼也統(tǒng)一寫成wait的形式呢? 答案是可以. 怎么改寫呢? 我們可以把鎖與鑰匙也當(dāng)作一種資源, 不過這個資源生產(chǎn)者和消費(fèi)者都可以消耗, 而且出屋子以后就要?dú)w還這個資源. 這個資源的初始值是多少? 顯然是1, 因為只有一把鎖與鑰匙. 代碼改寫如下:?
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 產(chǎn)品池可以容納的產(chǎn)品數(shù)量的最大值
/// </summary>
const int MAX_VALUE = 10;
/// <summary>
/// 產(chǎn)品的數(shù)量, 初始值為0
/// </summary>
int full = 0;
/// <summary>
/// 空位置的數(shù)量, 初始值為MAX_VALUE
/// </summary>
int empty = MAX_VALUE;
/// <summary>
/// 互斥信號量, 初值為1
/// </summary>
int mutex = 1;
std::mutex mtx; //用于訪問itemCount的互斥量, 即鎖和鑰匙
std::mutex consumer_mtx; //用于管理消費(fèi)者線程隊列的互斥量, 即鎖和鑰匙
std::mutex producer_mtx; //用于管理生產(chǎn)者線程隊列的互斥量, 即鎖和鑰匙
std::condition_variable consumer_cv; //條件變量, 可以視為消費(fèi)者線程管理隊列
std::condition_variable producer_cv; //條件變量, 可以視為生產(chǎn)者線程管理隊列
std::condition_variable producer_and_consumer_cv; //條件變量, 可以視為生產(chǎn)者和消費(fèi)者線程管理隊列
/// <summary>
/// 生產(chǎn)者函數(shù)
/// </summary>
void producer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(producer_mtx);
producer_cv.wait(lck, [] {return empty > 0; }); //阻塞當(dāng)前線程, 等待消費(fèi)者發(fā)出喚醒的信號, 只要空位置個數(shù) > 0, 就喚醒一個生產(chǎn)者線程
std::unique_lock<std::mutex> lck1(mtx);
producer_and_consumer_cv.wait(lck1, [] {return mutex > 0; }); //阻塞當(dāng)前線程, 等待其它線程發(fā)出喚醒的信號, 只要鎖與鑰匙個數(shù) > 0, 就喚醒該線程
--mutex;
--empty;
++full; //注意這里要同時改變full的值, 否則生產(chǎn)者生產(chǎn)完10個產(chǎn)品之后, 消費(fèi)者的喚醒條件full > 0始終不滿足, 程序就無法繼續(xù)執(zhí)行了
printf("生產(chǎn)者%d生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, MAX_VALUE - empty);
++mutex;
std::this_thread::sleep_for(std::chrono::seconds(1));
producer_and_consumer_cv.notify_all(); //歸還鎖和鑰匙, 喚醒所有的生產(chǎn)者和消費(fèi)者線程
consumer_cv.notify_one(); //喚醒一個消費(fèi)者線程
}
}
/// <summary>
/// 消費(fèi)者函數(shù)
/// </summary>
void consumer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(consumer_mtx);
consumer_cv.wait(lck, [] {return full > 0; }); //阻塞當(dāng)前線程, 等待生產(chǎn)者發(fā)出喚醒的信號, 只要產(chǎn)品個數(shù) > 0, 就喚醒一個消費(fèi)者線程
std::unique_lock<std::mutex> lck1(mtx);
producer_and_consumer_cv.wait(lck1, [] {return mutex > 0; }); //阻塞當(dāng)前線程, 等待其它線程發(fā)出喚醒的信號, 只要鎖與鑰匙個數(shù) > 0, 就喚醒該線程
--mutex;
--full;
++empty; //同理, 這里也要同時改變empty的值
printf("消費(fèi)者%d消費(fèi)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, full);
++mutex;
std::this_thread::sleep_for(std::chrono::seconds(1));
producer_and_consumer_cv.notify_all(); //歸還鎖和鑰匙, 喚醒所有的生產(chǎn)者和消費(fèi)者線程
producer_cv.notify_one(); //喚醒一個生產(chǎn)者線程
}
}
int main()
{
std::thread producer_thread1(producer_fun, 1); //生產(chǎn)者線程1
std::thread producer_thread2(producer_fun, 2); //生產(chǎn)者線程2
std::thread consumer_thread1(consumer_fun, 1); //消費(fèi)者線程1
std::thread consumer_thread2(consumer_fun, 2); //消費(fèi)者線程2
producer_thread1.join(); //等待線程結(jié)束
producer_thread2.join(); //等待線程結(jié)束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代碼14: 多個生產(chǎn)者, 多個消費(fèi)者(mutex記錄鎖與鑰匙的數(shù)量)
? ? ? ? 至此, 我們對empty, full, mutex的操作完成了形式上的統(tǒng)一. 接下來就可以提取我們需要的P(), V()操作了. 代碼如下:?
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 產(chǎn)品池可以容納的產(chǎn)品數(shù)量的最大值
/// </summary>
const int MAX_VALUE = 10;
/// <summary>
/// 產(chǎn)品的數(shù)量, 初始值為0
/// </summary>
int full = 0;
/// <summary>
/// 空位置的數(shù)量, 初始值為MAX_VALUE
/// </summary>
int empty = MAX_VALUE;
/// <summary>
/// 互斥信號量, 初值為1
/// </summary>
int mutex = 1;
std::mutex mtx; //用于訪問itemCount的互斥量, 即鎖和鑰匙
std::mutex consumer_mtx; //用于管理消費(fèi)者線程隊列的互斥量, 即鎖和鑰匙
std::mutex producer_mtx; //用于管理生產(chǎn)者線程隊列的互斥量, 即鎖和鑰匙
std::condition_variable consumer_cv; //條件變量, 可以視為消費(fèi)者線程管理隊列
std::condition_variable producer_cv; //條件變量, 可以視為生產(chǎn)者線程管理隊列
std::condition_variable producer_and_consumer_cv; //條件變量, 可以視為生產(chǎn)者和消費(fèi)者線程管理隊列
/// <summary>
/// P操作
/// </summary>
void P(std::mutex& mtx, std::condition_variable& cv, int& num) {
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&] {return num > 0; }); //阻塞當(dāng)前線程, 等待其它線程發(fā)出喚醒的信號, 只要num > 0, 就喚醒一個線程
}
/// <summary>
/// V操作
/// </summary>
void V(std::condition_variable& cv) {
cv.notify_all();
}
/// <summary>
/// 生產(chǎn)者函數(shù)
/// </summary>
void producer_fun(int index) {
while (true)
{
P(producer_mtx, producer_cv, empty);
P(mtx, producer_and_consumer_cv, mutex);
--mutex;
--empty;
++full; //注意這里要同時改變full的值, 否則生產(chǎn)者生產(chǎn)完10個產(chǎn)品之后, 消費(fèi)者的喚醒條件full > 0始終不滿足, 程序就無法繼續(xù)執(zhí)行了
printf("生產(chǎn)者%d生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, MAX_VALUE - empty);
++mutex;
//std::this_thread::sleep_for(std::chrono::seconds(1));
V(producer_and_consumer_cv);
V(consumer_cv);
}
}
/// <summary>
/// 消費(fèi)者函數(shù)
/// </summary>
void consumer_fun(int index) {
while (true)
{
P(consumer_mtx, consumer_cv, full);
P(mtx, producer_and_consumer_cv, mutex);
--mutex;
--full;
++empty; //同理, 這里也要同時改變empty的值
printf("消費(fèi)者%d消費(fèi)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, full);
++mutex;
//std::this_thread::sleep_for(std::chrono::seconds(1));
V(producer_and_consumer_cv);
V(producer_cv);
}
}
int main()
{
std::thread producer_thread1(producer_fun, 1); //生產(chǎn)者線程1
std::thread producer_thread2(producer_fun, 2); //生產(chǎn)者線程2
std::thread consumer_thread1(consumer_fun, 1); //消費(fèi)者線程1
std::thread consumer_thread2(consumer_fun, 2); //消費(fèi)者線程2
producer_thread1.join(); //等待線程結(jié)束
producer_thread2.join(); //等待線程結(jié)束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代碼15: 多個生產(chǎn)者, 多個消費(fèi)者(提取出P, V操作, 運(yùn)行結(jié)果有誤)
? ? ? ? 如上所示, 我們提取出了P, V操作, 運(yùn)行會發(fā)現(xiàn)運(yùn)行結(jié)果是錯誤的, 又回到了最初的亂序狀態(tài), 怎么回事呢? 我們打個斷點就會發(fā)現(xiàn), 在提取了P, V操作之后, 線程的執(zhí)行順序又被打亂了, 原來如此, 因為我們的P, V操作不是原子操作, 它是會被打亂了, 有可能P里面的代碼還沒運(yùn)行就轉(zhuǎn)而執(zhí)行別的代碼了. 你可能會問, 那為什么資料上面可以寫P, V操作呢? 那是因為書上面寫的是偽代碼, 默認(rèn)了P, 和V是原子操作. 那我們現(xiàn)在怎么解決目前的P, V操作會被打斷的問題呢? 再加一個鎖? 加了鎖之后再提取P和V, 然后又會被打斷執(zhí)行, 然后再加一個鎖......無限套娃肯定是不行的, 所以我們這里不追求形式上的一致了, 代碼14就可以作為生產(chǎn)者和消費(fèi)者問題的最終代碼. 至于我們?nèi)绾卧诓皇褂胢utex的情況下實現(xiàn)互斥訪問, 這個以后再分析. 至此, 我們已經(jīng)解決了生產(chǎn)者和消費(fèi)者問題. 最終代碼如下:?文章來源:http://www.zghlxwxcb.cn/news/detail-518277.html
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 產(chǎn)品池可以容納的產(chǎn)品數(shù)量的最大值
/// </summary>
const int MAX_VALUE = 10;
/// <summary>
/// 產(chǎn)品的數(shù)量, 初始值為0
/// </summary>
int full = 0;
/// <summary>
/// 空位置的數(shù)量, 初始值為MAX_VALUE
/// </summary>
int empty = MAX_VALUE;
/// <summary>
/// 互斥信號量, 初值為1
/// </summary>
int mutex = 1;
std::mutex mtx; //用于訪問itemCount的互斥量, 即鎖和鑰匙
std::mutex consumer_mtx; //用于管理消費(fèi)者線程隊列的互斥量, 即鎖和鑰匙
std::mutex producer_mtx; //用于管理生產(chǎn)者線程隊列的互斥量, 即鎖和鑰匙
std::condition_variable consumer_cv; //條件變量, 可以視為消費(fèi)者線程管理隊列
std::condition_variable producer_cv; //條件變量, 可以視為生產(chǎn)者線程管理隊列
std::condition_variable producer_and_consumer_cv; //條件變量, 可以視為生產(chǎn)者和消費(fèi)者線程管理隊列
/// <summary>
/// 生產(chǎn)者函數(shù)
/// </summary>
void producer_fun(int index) {
while (true)
{
//P(empty)
std::unique_lock<std::mutex> lck(producer_mtx);
producer_cv.wait(lck, [] {return empty > 0; }); //阻塞當(dāng)前線程, 等待消費(fèi)者發(fā)出喚醒的信號, 只要空位置個數(shù) > 0, 就喚醒一個生產(chǎn)者線程
//P(mutex)
std::unique_lock<std::mutex> lck1(mtx);
producer_and_consumer_cv.wait(lck1, [] {return mutex > 0; }); //阻塞當(dāng)前線程, 等待其它線程發(fā)出喚醒的信號, 只要鎖與鑰匙個數(shù) > 0, 就喚醒該線程
//臨界區(qū)代碼
--mutex;
--empty;
++full; //注意這里要同時改變full的值, 否則生產(chǎn)者生產(chǎn)完10個產(chǎn)品之后, 消費(fèi)者的喚醒條件full > 0始終不滿足, 程序就無法繼續(xù)執(zhí)行了
printf("生產(chǎn)者%d生產(chǎn)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, MAX_VALUE - empty);
++mutex;
std::this_thread::sleep_for(std::chrono::seconds(1));
//V(mutex)
producer_and_consumer_cv.notify_all(); //歸還鎖和鑰匙, 喚醒所有的生產(chǎn)者和消費(fèi)者線程
//V(full)
consumer_cv.notify_one(); //喚醒一個消費(fèi)者線程
}
}
/// <summary>
/// 消費(fèi)者函數(shù)
/// </summary>
void consumer_fun(int index) {
while (true)
{
//P(full)
std::unique_lock<std::mutex> lck(consumer_mtx);
consumer_cv.wait(lck, [] {return full > 0; }); //阻塞當(dāng)前線程, 等待生產(chǎn)者發(fā)出喚醒的信號, 只要產(chǎn)品個數(shù) > 0, 就喚醒一個消費(fèi)者線程
//P(mutex)
std::unique_lock<std::mutex> lck1(mtx);
producer_and_consumer_cv.wait(lck1, [] {return mutex > 0; }); //阻塞當(dāng)前線程, 等待其它線程發(fā)出喚醒的信號, 只要鎖與鑰匙個數(shù) > 0, 就喚醒該線程
//臨界區(qū)代碼
--mutex;
--full;
++empty; //同理, 這里也要同時改變empty的值
printf("消費(fèi)者%d消費(fèi)產(chǎn)品, 產(chǎn)品數(shù)量: %d\n", index, full);
++mutex;
std::this_thread::sleep_for(std::chrono::seconds(1));
//V(mutex)
producer_and_consumer_cv.notify_all(); //歸還鎖和鑰匙, 喚醒所有的生產(chǎn)者和消費(fèi)者線程
//V(empty)
producer_cv.notify_one(); //喚醒一個生產(chǎn)者線程
}
}
int main()
{
std::thread producer_thread1(producer_fun, 1); //生產(chǎn)者線程1
std::thread producer_thread2(producer_fun, 2); //生產(chǎn)者線程2
std::thread consumer_thread1(consumer_fun, 1); //消費(fèi)者線程1
std::thread consumer_thread2(consumer_fun, 2); //消費(fèi)者線程2
producer_thread1.join(); //等待線程結(jié)束
producer_thread2.join(); //等待線程結(jié)束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代碼16: 多個生產(chǎn)者, 多個消費(fèi)者(生產(chǎn)者消費(fèi)者問題最終代碼)文章來源地址http://www.zghlxwxcb.cn/news/detail-518277.html
到了這里,關(guān)于生產(chǎn)者與消費(fèi)者問題的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!