由故事引入模型
故事背景
有一個(gè)小朋友叫小C,他住的地方?jīng)]有超市,只有幾家供貨商,因?yàn)槊考夜┴浬填愋蛦我?,買東西還要跑來跑去的,而且供貨商晚上還不開門,買東西特別不方便,不僅小C覺得麻煩,其他人也覺得麻煩。小C想:為什么不能把這幾家供貨商的東西先放在一個(gè)地方呢,再由幾個(gè)人專門賣,需要什么就直接挑選就好了,不用跑來跑去的,營業(yè)時(shí)間甚至可以全天。于是乎,小C就打電話給了市長,提了這個(gè)建議。市長知道了這個(gè)地方的市民買東西特別不方便,就接受了這個(gè)建議,于是就在這個(gè)地方建了個(gè)超市。
從此以后,小C和市民們買東西變得方便了,幾家供貨商把各種類型的商品送進(jìn)超市,市民們只需要在超市進(jìn)行挑選就可以了。大大節(jié)省了市民的時(shí)間,供應(yīng)商也提高了工作效率,一次生產(chǎn)大批量的貨物送進(jìn)超市就好了,在貨物充足時(shí),供應(yīng)商也能得到很好的休息,等貨物缺乏再送過去。
由故事抽象出來的模型:
小C和市民們都是消費(fèi)者,而供應(yīng)商是生產(chǎn)者,超市是一種交易場所,為第三方
這就是生產(chǎn)者消費(fèi)者模型,計(jì)算機(jī)中,生產(chǎn)者和消費(fèi)者都是線程,第三方是一種特定數(shù)據(jù)結(jié)構(gòu)的緩沖區(qū)。
線程之間想要通信,緩沖區(qū)一定要被所有線程看到, 也就是說緩沖區(qū)一定會被多線程并發(fā)訪問, 那么緩沖區(qū)就要保護(hù)共享資源的安全,維護(hù)線程互斥與同步的關(guān)系。
供貨商們的矛盾
由于超市的空間是有限的,這讓供貨商之間開始慢慢較量了,誰都想讓自己的貨物在超市多放一點(diǎn)。一天,供貨商小S和供貨商小D同時(shí)來超市放置自己的貨物了。剛好超市這天只能放一家供貨商的貨物了,于是小S和小D就吵起來了。小S:“這塊地方只有我能放貨物,你不能放”。小D不服了:“憑什么只有你能放,我不能放?”于是兩家供貨商就大吵大鬧,鬧得沸沸揚(yáng)揚(yáng)的,不過這也不是一天兩天的事了。超市知道了這件事后,就制定了一個(gè)叫做“鎖”的規(guī)則:我這里有一把象征性的鎖和鑰匙,每天,誰能先拿到鎖,誰就先放貨物,放完后就解鎖,下一次你們再繼續(xù)競爭這把鎖。
供貨商是生產(chǎn)者,那么生產(chǎn)者和生產(chǎn)者之間的關(guān)系是競爭的關(guān)系。再極端一點(diǎn),在線程中,我們叫互斥關(guān)系,同一時(shí)間一次只能執(zhí)行一個(gè)線程。
市民們和供貨商之間的矛盾一
小C早上想去超市買幾箱可樂,很不巧超市沒可樂了。于是小C過了一兩小時(shí)又去超市問有可樂了嗎,超市說沒有。再過幾個(gè)小時(shí),小C再去,還是沒可樂,過一會又去,還是沒有。超市見小C頻繁地來也不是個(gè)辦法,就想了一個(gè)辦法:你不要頻繁的來了,你給我你的聯(lián)系方式,等供貨商送貨來了,我再打電話給你。小C答應(yīng)了這種請求。
超市想起前幾個(gè)星期,超市貨滿放不下貨物的時(shí)候,供貨商也頻繁地送貨物來,每次都灰溜溜地回去了。于是超市也打電話對供貨商說:你不要頻繁地來了,你給我你的聯(lián)系方式,等貨物缺了,我再打電話給你,你再來。
小C想買可樂,但是超市沒貨,卻隔一會就來問超市有貨物嗎。
供貨商想送貨進(jìn)超市,但是超市貨滿了,卻隔一會就問超市能進(jìn)貨了嗎
這種可以抽象成線程的的頻繁檢測。
超市想出來的方案:等有貨了再聯(lián)系小C,等沒貨了再聯(lián)系供應(yīng)商。
可以抽象成緩沖區(qū)維護(hù)了生產(chǎn)者和消費(fèi)者的同步關(guān)系,維護(hù)了線程之間的同步關(guān)系,讓線程之間對第三方不再頻繁的檢測。
市民們和供貨商之間的矛盾二
小C終于能去超市買可樂了,此時(shí)供貨商小S想在這個(gè)地方放貨物。由于超市空間限制,只能一個(gè)人在這里。小C:"讓我先買東西,你再放。"供貨商小S又不服氣了:“上次是我的同行和我搶,這次怎么到你了?,讓我先放”。兩個(gè)人誰也不服誰。于是,“鎖”規(guī)則又可以用起來了。
小C是消費(fèi)者,供貨商是生產(chǎn)者,生產(chǎn)者和消費(fèi)者之間也有互斥關(guān)系。
市民們的矛盾
小C好不容易能買可樂了,可是小N來了,他也想買這幾箱可樂。
于是小C又和小N吵起來了,之前制定的“鎖”規(guī)則又起效果了。
小C和小N都是消費(fèi)者,消費(fèi)者和消費(fèi)者之間是“互斥關(guān)系”
模型總結(jié)
- 三種關(guān)系:生產(chǎn)者和生產(chǎn)者之間的關(guān)系(互斥),生產(chǎn)者和消費(fèi)者之間的關(guān)系(互斥與同步),消費(fèi)者和消費(fèi)者之間關(guān)系(互斥)
- 兩種角色:生產(chǎn)者和消費(fèi)者
- 一個(gè)交易場所:通常是緩沖區(qū)
生產(chǎn)者消費(fèi)者模型
為什么要使用生產(chǎn)者消費(fèi)者模型?
生產(chǎn)者消費(fèi)者模式就是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。這個(gè)阻塞隊(duì)列就是用來給生產(chǎn)者和消費(fèi)者解耦的。
生產(chǎn)者消費(fèi)者模型的特點(diǎn)
由上面的故事已經(jīng)進(jìn)行總結(jié)。
生產(chǎn)者消費(fèi)者模型是多線程同步與互斥的一個(gè)經(jīng)典場景,其特點(diǎn)如下:
- 三種關(guān)系: 生產(chǎn)者和生產(chǎn)者(互斥關(guān)系)、消費(fèi)者和消費(fèi)者(互斥關(guān)系)、生產(chǎn)者和消費(fèi)者(互斥關(guān)系、同步關(guān)系)。
- 兩種角色: 生產(chǎn)者和消費(fèi)者。(通常由進(jìn)程或線程承擔(dān))
- 一個(gè)交易場所: 通常指的是內(nèi)存中的一段緩沖區(qū)。(可以自己通過某種方式組織起來)
在編寫生產(chǎn)者消費(fèi)者代碼的時(shí)候,本質(zhì)就上就是對三種特點(diǎn)進(jìn)行維護(hù)。
生產(chǎn)者和生產(chǎn)者、消費(fèi)者和消費(fèi)者、生產(chǎn)者和消費(fèi)者,它們之間為什么會存在互斥關(guān)系?
介于生產(chǎn)者和消費(fèi)者之間的容器可能會被多個(gè)執(zhí)行流同時(shí)訪問,因此我們需要將該臨界資源用互斥鎖保護(hù)起來。
其中,所有的生產(chǎn)者和消費(fèi)者都會競爭式的申請鎖,因此生產(chǎn)者和生產(chǎn)者、消費(fèi)者和消費(fèi)者、生產(chǎn)者和消費(fèi)者之間都存在互斥關(guān)系。
生產(chǎn)者和消費(fèi)者之間為什么會存在同步關(guān)系?
- 如果讓生產(chǎn)者一直生產(chǎn),那么當(dāng)生產(chǎn)者生產(chǎn)的數(shù)據(jù)將容器塞滿后,生產(chǎn)者再生產(chǎn)數(shù)據(jù)就會生產(chǎn)失敗。
- 反之,讓消費(fèi)者一直消費(fèi),那么當(dāng)容器當(dāng)中的數(shù)據(jù)被消費(fèi)完后,消費(fèi)者再進(jìn)行消費(fèi)就會消費(fèi)失敗。
雖然這樣不會造成任何數(shù)據(jù)不一致的問題,但是這樣會引起另一方的饑餓問題,是非常低效的。我們應(yīng)該讓生產(chǎn)者和消費(fèi)者訪問該容器時(shí)具有一定的順序性,比如讓生產(chǎn)者先生產(chǎn),然后再讓消費(fèi)者進(jìn)行消費(fèi)。
注意: 互斥關(guān)系保證的是數(shù)據(jù)的正確性,而同步關(guān)系是為了讓多線程之間協(xié)同起來。
生產(chǎn)者消費(fèi)者模型優(yōu)點(diǎn)
- 解耦(生產(chǎn)者只負(fù)責(zé)生產(chǎn),消費(fèi)者只負(fù)責(zé)消費(fèi)者)
- 支持并發(fā)
- 支持忙閑不均?!?假設(shè)沒有緩沖區(qū),且消費(fèi)者和生產(chǎn)者的速度不匹配,則會造成CPU的浪費(fèi)。生產(chǎn)者/消費(fèi)者模型使得生產(chǎn)者/消費(fèi)者的處理能力達(dá)到一個(gè)動態(tài)的平衡。
基于BlockingQueue的生產(chǎn)者消費(fèi)者模型
在多線程編程中阻塞隊(duì)列(Blocking Queue)是一種常用于實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型的數(shù)據(jù)結(jié)構(gòu)。其與普通的隊(duì)列區(qū)別在于,當(dāng)隊(duì)列為空時(shí),從隊(duì)列獲取元素的操作將會被阻塞,直到隊(duì)列中被放入了元素;當(dāng)隊(duì)列滿時(shí),往隊(duì)列里存放元素的操作也會被阻塞,直到有元素被從隊(duì)列中取出(以上的操作都是基于不同的線程來說的,線程在對阻塞隊(duì)列進(jìn)行操作時(shí)會被阻塞)
C++ queue模擬阻塞隊(duì)列的生產(chǎn)消費(fèi)模型
為了便于理解,這里以單生產(chǎn)者,單消費(fèi)者為例。
先創(chuàng)建一個(gè)BlockingQueue類來充當(dāng)我們的緩沖區(qū)。
#pragma once
#include <iostream>
#include <queue>
const int gcap = 5;//定義為 5方便后面進(jìn)行測試
template <class T>
class BlockingQueue
{
public:
BlockingQueue(const int cap = gcap) : _capapacity = cap
{
}
!BlockingQueue()
{
}
private:
std::queue<T> _q;//隊(duì)列
int _capacity;//隊(duì)列的容量上限
};
生產(chǎn)者消費(fèi)者模型是用在多線程場景下的,所以要我們要保證它是線程安全的,要保證線程互斥和線程同步。所以要加上鎖和條件變量
- 在這個(gè)模型中,由于我們要避免生產(chǎn)者和消費(fèi)者同時(shí)訪問一份資源,只需要一把鎖就夠了。
- 但是條件變量需要兩個(gè)。我們的要求是:當(dāng)隊(duì)列為空時(shí),從隊(duì)列中獲取元素會被阻塞,直到隊(duì)列中放入了元素;當(dāng)隊(duì)列為滿時(shí),往隊(duì)列里存放元素也會被阻塞,直到隊(duì)列里有元素被取出。所以一個(gè)條件變量是不夠的,需要兩個(gè)條件變量,分別表示滿和空。
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
const int gcap = 5;//定義為5方便后面進(jìn)行測試
template <class T>
class BlockingQueue
{
public:
BlockingQueue(const int cap = gcap) : _capacity(cap)
{
pthread_mutex_init(&_mutex,nullptr);//初始化鎖
pthread_code_init(&_full,nullptr);//初始化條件變量
pthread_code_init(&_empty,nullptr);//初始化條件變量
}
~BlockingQueue()
{
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
private:
std::queue<T> _q;//隊(duì)列
int _capacity;//隊(duì)列的容量上限
pthread_mutex_t _mutex;//定義鎖
pthread_cond_t _full;//條件變量,滿時(shí)生產(chǎn)者阻塞
pthread_cond_t _empty;//空時(shí)消費(fèi)者阻塞
};
判空和判滿函數(shù)
bool isFull(){return _q.size() == _capacity;}
bool isEmpty(){return _q.empty();}
首先先粗略寫一下生產(chǎn)者要完成的任務(wù):往容器里面放元素。這個(gè)時(shí)候需要判斷容器是否是滿的。
void push(const T& in)//生產(chǎn)者把元素放進(jìn)容器
{
pthread_mutex_lock(&_mutex);//加鎖保證線程安全
if(isFull())//判斷容器是否為滿
{
//如果滿了就進(jìn)行等待
pthread_cond_wait(&_full,&_mutex);
}
_q.push(in);//未滿,就生產(chǎn),放進(jìn)容器
pthread_mutex_unlock();
}
這里簡單談一下pthread_cond_wait這個(gè)函數(shù)
- 我們只能在臨界區(qū)內(nèi)部,判斷臨界資源是否就緒,這就注定了在我們在當(dāng)前一定是持有鎖的。
- 要讓線程進(jìn)行休眠等待,就不能持有鎖等待。
- 這就說明,pthread_cond_wait 要有鎖的釋放能力。
- 當(dāng)線程醒來的時(shí)候,會繼續(xù)從臨界區(qū)內(nèi)部繼續(xù)運(yùn)行,因?yàn)槭窃谂R界區(qū)被切走的。
- 注定了當(dāng)線程被喚醒的時(shí)候,繼續(xù)在pthread_cond_wait 函數(shù)向后運(yùn)行,又要重新申請鎖,申請成功才會返回
接下來再粗略寫一下消費(fèi)者要做的事情:從容器取元素。如果容器為空就等待。
void pop(T* out)
{
pthread_mutex_lock(&_mutex);//加鎖保證線程安全
if(isEmpty())//判斷容器是否為空
{
pthread_cond_wait(&_empty,&_mutex);//如果為空,消費(fèi)者就進(jìn)行等待
}
*out = _q.front();//不為空,就取隊(duì)列頭部元素
_q.pop();//取出以后,隊(duì)列彈出該元素
pthread_mutex_unlock(&_mutex);
}
這兩段生產(chǎn)者和消費(fèi)者各自執(zhí)行各自任務(wù)的代碼是有問題的。
- 假如生產(chǎn)者要往容器存數(shù)據(jù)的時(shí)候,判斷容器是滿的,那么就去等待了。
- 此時(shí)消費(fèi)者繼續(xù)消費(fèi),當(dāng)消費(fèi)到容器為空時(shí),消費(fèi)者又去等待了。
此時(shí)問題就是,沒人能喚醒生產(chǎn)者和消費(fèi)者。
解決方法如下:
互相喚醒對方
- 當(dāng)生產(chǎn)者能生產(chǎn)時(shí),每次都使用函數(shù)喚醒消費(fèi)者。
- 當(dāng)消費(fèi)者能消費(fèi)時(shí),每次都使用函數(shù)喚醒生產(chǎn)者。
代碼如下:
void push(const T& in)//生產(chǎn)者把元素放進(jìn)容器
{
pthread_mutex_lock(&_mutex);//加鎖保證線程安全
if(isFull())//判斷容器是否為滿
{
//如果滿了就進(jìn)行等待
pthread_cond_wait(&_full,&_mutex);
}
_q.push(in);//未滿,就生產(chǎn),放進(jìn)容器
//此時(shí)可以加一些策略,比如容量為多少時(shí)就喚醒,我們這里就不加了。
pthread_cond_signal(&_empty);
pthread_mutex_unlock(&_mutex);
}
void pop(const T* out)
{
pthread_mutex_lock(&_mutex);//加鎖保證線程安全
if(isEmpty())//判斷容器是否為空
{
pthread_cond_wait(&_empty,&_mutex);//如果為空,消費(fèi)者就進(jìn)行等待
}
*out = _q.front();//不為空,就取隊(duì)列頭部元素
_q.pop();//取出以后,隊(duì)列彈出該元素
pthread_cond_signal(&_full);
pthread_mutex_unlock(&_mutex);
}
目前代碼如下:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
using namespace std;
const int gcap = 5;//定義為5方便后面進(jìn)行測試
template <class T>
class BlockingQueue
{
public:
BlockingQueue(const int cap = gcap) : _capacity(cap)
{
pthread_mutex_init(&_mutex,nullptr);//初始化鎖
pthread_cond_init(&_full,nullptr);//初始化條件變量
pthread_cond_init(&_empty,nullptr);//初始化條件變量
}
bool isFull(){return _q.size() == _capacity;}
bool isEmpty(){return _q.empty();}
void push(const T& in)//生產(chǎn)者把元素放進(jìn)容器
{
pthread_mutex_lock(&_mutex);//加鎖保證線程安全
if(isFull())//判斷容器是否為滿
{
//如果滿了就進(jìn)行等待
pthread_cond_wait(&_full,&_mutex);
}
_q.push(in);//未滿,就生產(chǎn),放進(jìn)容器
//此時(shí)可以加一些策略,比如容量為多少時(shí)就喚醒,我們這里就不加了。
pthread_cond_signal(&_empty);
pthread_mutex_unlock(&_mutex);
}
void pop(T* out)
{
pthread_mutex_lock(&_mutex);//加鎖保證線程安全
if(isEmpty())//判斷容器是否為空
{
pthread_cond_wait(&_empty,&_mutex);//如果為空,消費(fèi)者就進(jìn)行等待
}
*out = _q.front();//不為空,就取隊(duì)列頭部元素
_q.pop();//取出以后,隊(duì)列彈出該元素
pthread_cond_signal(&_full);
pthread_mutex_unlock(&_mutex);
}
~BlockingQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
private:
std::queue<T> _q;//隊(duì)列
int _capacity;//隊(duì)列的容量上限
pthread_mutex_t _mutex;//定義鎖
pthread_cond_t _full;//條件變量,滿時(shí)生產(chǎn)者阻塞
pthread_cond_t _empty;//空時(shí)消費(fèi)者阻塞
};
小測試
我們寫個(gè)多線程代碼測試一下
#include "block_queue.hpp"
#include <ctime>
#include <unistd.h>
using namespace std;
void* consumer(void* args)
{
BlockingQueue<int>* bq = static_cast<BlockingQueue<int>*> (args);
while(true)
{
//1.將數(shù)據(jù)從blockqueue中獲取
int data = 0;
bq->pop(&data);
//2.結(jié)合某種業(yè)務(wù)邏輯,處理數(shù)據(jù)
//這里先打印一下
cout << "consumer data : " << data << endl;
}
}
void* producer(void* args)
{
BlockingQueue<int>* bq = static_cast<BlockingQueue<int>*> (args);
while(true)
{
//1.生產(chǎn)者要先通過某種渠道獲得數(shù)據(jù),可以讓用戶從標(biāo)準(zhǔn)輸入輸入,也可以從網(wǎng)絡(luò)里讀
//這里我們簡單處理一下,自己創(chuàng)建隨機(jī)一些數(shù)據(jù)測試一下就行
int data = rand() % 10 + 1;
//2.將數(shù)據(jù)推送到blockqueue,完成生產(chǎn)過程
bq->push(data);
cout << "prodecer data : " << data << endl;//打印查看
}
}
int main()
{
srand((uint64_t)time(nullptr) % 100000);//測試要用的數(shù)據(jù)
//這里是為了方便理解,先寫成單生產(chǎn)單消費(fèi)
BlockingQueue<int>* bq = new BlockingQueue<int>();
pthread_t c,p;//c是消費(fèi)者線程,p是生產(chǎn)者線程
pthread_create(&c,nullptr,consumer,bq);//讓消費(fèi)者和生產(chǎn)者看到同一份隊(duì)列
pthread_create(&p,nullptr,producer,bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
我們先讓消費(fèi)者的線程sleep(1),讓它消費(fèi)慢一點(diǎn)。然后生產(chǎn)者正常生產(chǎn)。
我們會發(fā)現(xiàn),因?yàn)槲覀冏铋_始容量最大為5,所有生產(chǎn)者很容易就把容器塞滿了。
塞滿以后就阻塞了,輪到消費(fèi)者消費(fèi)一個(gè),根據(jù)我們代碼所寫,每次消費(fèi)后就去喚醒生產(chǎn)者。生產(chǎn)者生產(chǎn)了,又滿了。又輪到消費(fèi)者消費(fèi)一個(gè),消費(fèi)者又喚醒生產(chǎn)者。所以會出現(xiàn)消費(fèi)一個(gè),生產(chǎn)一個(gè)的情況。
很容易觀察到,消費(fèi)者消費(fèi)的時(shí)候每次都是從隊(duì)列的頭獲得數(shù)據(jù)的。
接下來,我們讓消費(fèi)者正常消費(fèi),生產(chǎn)者線程sleep(1),生產(chǎn)慢一點(diǎn)
void* producer(void* args)
{
BlockingQueue<int>* bq = static_cast<BlockingQueue<int>*> (args);
while(true)
{
sleep(1);
//1.生產(chǎn)者要先通過某種渠道獲得數(shù)據(jù),可以讓用戶從標(biāo)準(zhǔn)輸入輸入,也可以從網(wǎng)絡(luò)里讀
//這里我們簡單處理一下,自己創(chuàng)建隨機(jī)一些數(shù)據(jù)測試一下就行
int data = rand() % 10 + 1;
//2.將數(shù)據(jù)推送到blockqueue,完成生產(chǎn)過程
bq->push(data);
cout << "prodecer data : " << data << endl;//打印查看
}
}
還是出現(xiàn)了生產(chǎn)一個(gè)消費(fèi)一個(gè)的情況。
原因是最開始隊(duì)列是空的,生產(chǎn)者生產(chǎn)慢了,消費(fèi)者只能等待。等到生產(chǎn)者生產(chǎn)了一個(gè)以后,我們沒加任何策略,只要生產(chǎn)了就喚醒消費(fèi)者線程。然后消費(fèi)者消費(fèi)了。隊(duì)列又空了,消費(fèi)者又要等待生產(chǎn)者生產(chǎn)。
由這個(gè)小測試我們可以看到,我們成功地讓多線程協(xié)同起來了。
細(xì)節(jié)1 線程被誤喚醒的情況
現(xiàn)在是單生產(chǎn)者單消費(fèi)者的情況。如果改成只有一個(gè)消費(fèi)者,五個(gè)生產(chǎn)者,有沒有可能出現(xiàn)生產(chǎn)者被誤喚醒的情況?
答案是可能的。假設(shè)現(xiàn)在隊(duì)列里的數(shù)據(jù)滿了,而消費(fèi)者喚醒生產(chǎn)者的線程不是pthread_cond_signal(),而是pthread_cond_broadcast(),一下子喚醒五個(gè)生產(chǎn)者。
這時(shí)候問題就來了,如果消費(fèi)者只消費(fèi)了一個(gè)數(shù)據(jù)就全部喚醒了五個(gè)生產(chǎn)者,這五個(gè)生產(chǎn)者之前都通過if語句判斷通過在進(jìn)行等待,喚醒時(shí)都會從箭頭所指處繼續(xù)執(zhí)行代碼。都會執(zhí)行push語句,就可能超過隊(duì)列的容量上限。
這只是被誤喚醒的一個(gè)例子,實(shí)際中可能還要很多情況被誤喚醒。所以我們就要避免這種情況。
解決方法: if語句改成while即可
被喚醒的時(shí)候再判斷一下是否是滿了,滿了繼續(xù)等待,這樣就不怕被誤喚醒導(dǎo)致繼續(xù)執(zhí)行下面的代碼了。
void push(const T& in)//生產(chǎn)者把元素放進(jìn)容器
{
pthread_mutex_lock(&_mutex);//加鎖保證線程安全
while(isFull())//判斷容器是否為滿
{
//如果滿了就進(jìn)行等待
pthread_cond_wait(&_full,&_mutex);
}
_q.push(in);//未滿,就生產(chǎn),放進(jìn)容器
//此時(shí)可以加一些策略,比如容量為多少時(shí)就喚醒,我們這里就不加了。
pthread_cond_signal(&_empty);
pthread_mutex_unlock(&_mutex);
}
同理,消費(fèi)者也必須改成while
void pop(T* out)
{
pthread_mutex_lock(&_mutex);//加鎖保證線程安全
while(isEmpty())//判斷容器是否為空
{
pthread_cond_wait(&_empty,&_mutex);//如果為空,消費(fèi)者就進(jìn)行等待
}
*out = _q.front();//不為空,就取隊(duì)列頭部元素
_q.pop();//取出以后,隊(duì)列彈出該元素
pthread_cond_signal(&_full);
pthread_mutex_unlock(&_mutex);
}
細(xì)節(jié)2 生產(chǎn)者消費(fèi)者模型高效在哪里?
生產(chǎn)者消費(fèi)者模型就是生產(chǎn)者往容器里放元素,消費(fèi)者再從容器里取元素。同時(shí)為了保證線程安全,我們還給它加鎖了,所以是串行執(zhí)行的,那么它高效在哪呢?
思考這幾個(gè)問題:
- 生產(chǎn)者是不是也需要從外部獲取數(shù)據(jù)才能送到容器?
- 消費(fèi)者的數(shù)據(jù)是不是也要經(jīng)過業(yè)務(wù)處理后才能送出去?
- 生產(chǎn)者什么時(shí)候獲取數(shù)據(jù)的時(shí)候能干嘛?
- 消費(fèi)者送出數(shù)據(jù)的時(shí)候能干嘛?
首先生產(chǎn)者需要從外部獲取數(shù)據(jù)才能送到容器,在獲取數(shù)據(jù)的同時(shí)也能把以前的數(shù)據(jù)送到容器。消費(fèi)者要把處理后的數(shù)據(jù)送出去,送出去的同時(shí)也能從容器拿到新的數(shù)據(jù)。這就是生產(chǎn)者消費(fèi)者模型高效的表現(xiàn)。
多生產(chǎn)者多消費(fèi)者
我們可以接下來測試多生產(chǎn)多消費(fèi)者的情況了,由于線程間是串行執(zhí)行的,所以代碼肯定是能執(zhí)行的。
#include "block_queue.hpp"
#include <ctime>
#include <unistd.h>
using namespace std;
void* consumer(void* args)
{
BlockingQueue<int>* bq = static_cast<BlockingQueue<int>*> (args);
while(true)
{
sleep(1);
//1.將數(shù)據(jù)從blockqueue中獲取
int data = 0;
bq->pop(&data);
//2.結(jié)合某種業(yè)務(wù)邏輯,處理數(shù)據(jù)
//這里先打印一下
cout << pthread_self() << " | "<<"consumer data : " << data << endl;
}
}
void* producer(void* args)
{
BlockingQueue<int>* bq = static_cast<BlockingQueue<int>*> (args);
while(true)
{
sleep(1);
//1.生產(chǎn)者要先通過某種渠道獲得數(shù)據(jù),可以讓用戶從標(biāo)準(zhǔn)輸入輸入,也可以從網(wǎng)絡(luò)里讀
//這里我們簡單處理一下,自己創(chuàng)建隨機(jī)一些數(shù)據(jù)測試一下就行
int data = rand() % 10 + 1;
//2.將數(shù)據(jù)推送到blockqueue,完成生產(chǎn)過程
bq->push(data);
cout << pthread_self() << " | " << "prodecer data : " << data << endl;//打印查看
}
}
int main()
{
srand((uint64_t)time(nullptr) % 100000);//測試要用的數(shù)據(jù)
//這里是為了方便理解,先寫成單生產(chǎn)單消費(fèi)
BlockingQueue<int>* bq = new BlockingQueue<int>();
pthread_t c1,c2,p1,p2;//c是消費(fèi)者線程,p是生產(chǎn)者線程
pthread_create(&c1,nullptr,consumer,bq);//讓消費(fèi)者和生產(chǎn)者看到同一份隊(duì)列
pthread_create(&c2,nullptr,consumer,bq);//讓消費(fèi)者和生產(chǎn)者看到同一份隊(duì)列
pthread_create(&p1,nullptr,producer,bq);
pthread_create(&p2,nullptr,producer,bq);
pthread_join(c1,nullptr);
pthread_join(c2,nullptr);
pthread_join(p1,nullptr);
pthread_join(p2,nullptr);
return 0;
}
使用ps -aL查看,包括線程在內(nèi),確實(shí)有五個(gè)線程在執(zhí)行。
題外話
我們在測試的時(shí)候,只測試了int數(shù)據(jù)類型的文章來源:http://www.zghlxwxcb.cn/news/detail-585218.html
BlockingQueue<int>* bq = new BlockingQueue<int>();
實(shí)際上,我們用的是一個(gè)類模板,也就說不僅僅可以傳簡單的數(shù)據(jù)類型,進(jìn)行簡單的數(shù)據(jù)處理,還可以傳相應(yīng)的類,類里面寫你要接收的數(shù)據(jù)和處理數(shù)據(jù)的方式,然后由生產(chǎn)者從外界接受數(shù)據(jù),存到對象里面,再把這個(gè)對象傳給容器,消費(fèi)者再拿出這個(gè)對象,根據(jù)類里面的處理數(shù)據(jù)的方式進(jìn)行處理,然后在發(fā)到外界。文章來源地址http://www.zghlxwxcb.cn/news/detail-585218.html
到了這里,關(guān)于【Linux系統(tǒng)】結(jié)合有趣的小故事讓你學(xué)懂生產(chǎn)者消費(fèi)者模型的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!