生產(chǎn)者 - 消費(fèi)者模型 Producer-consumer problem 是一個非常經(jīng)典的多線程并發(fā)協(xié)作的模型,在分布式系統(tǒng)里非常常見。
一、為何要使用生產(chǎn)者消費(fèi)者模型
在多線程開發(fā)中,如果生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度很快,而消費(fèi)者消費(fèi)數(shù)據(jù)的速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者消費(fèi)完了數(shù)據(jù)才能夠繼續(xù)生產(chǎn)數(shù)據(jù),同理如果消費(fèi)者的速度大于生產(chǎn)者那么消費(fèi)者就會經(jīng)常處理等待狀態(tài),所以為了達(dá)到生產(chǎn)者和消費(fèi)者生產(chǎn)數(shù)據(jù)和消費(fèi)數(shù)據(jù)之間的平衡,那么就需要一個緩沖區(qū)用來存儲生產(chǎn)者生產(chǎn)的數(shù)據(jù),所以就引入了生產(chǎn)者-消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式就是通過一個容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。這個阻塞隊(duì)列就是用來給生產(chǎn)者和消費(fèi)者解耦的。
二、生產(chǎn)者消費(fèi)者模型的理解
1、生產(chǎn)者消費(fèi)者模型的特點(diǎn)
-
三種關(guān)系: 生產(chǎn)者和生產(chǎn)者(互斥關(guān)系)、消費(fèi)者和消費(fèi)者(互斥關(guān)系)、生產(chǎn)者和消費(fèi)者(互斥關(guān)系、同步關(guān)系)。
-
兩種角色: 生產(chǎn)者和消費(fèi)者。(通常由進(jìn)程或線程承擔(dān))
-
一個交易場所: 通常指的是內(nèi)存中的一段緩沖區(qū)。(可以自己通過某種數(shù)據(jù)結(jié)構(gòu)組織起來)
為什么生產(chǎn)者和消費(fèi)者之間為什么會存在互斥且同步關(guān)系?
-
互斥關(guān)系:由于所有的生產(chǎn)者和消費(fèi)者之間都是訪問的同一段緩沖區(qū)(臨界資源),為了避免出現(xiàn)數(shù)據(jù)不一致性,我們需要對訪問緩沖區(qū)的線程進(jìn)行加鎖,于是無論是生產(chǎn)者還是消費(fèi)者都要競爭同一把鎖,所以是互斥關(guān)系。
-
同步關(guān)系: 如果讓生產(chǎn)者一直生產(chǎn),那么當(dāng)生產(chǎn)者生產(chǎn)的數(shù)據(jù)將容器塞滿后,生產(chǎn)者再生產(chǎn)數(shù)據(jù)就會生產(chǎn)失敗。反之,讓消費(fèi)者一直消費(fèi),那么當(dāng)容器當(dāng)中的數(shù)據(jù)被消費(fèi)完后,消費(fèi)者再進(jìn)行消費(fèi)就會消費(fèi)失敗。為了讓生產(chǎn)者和消費(fèi)者線程之間協(xié)同起來就需要有同步關(guān)系!
2、生產(chǎn)者消費(fèi)者模型的優(yōu)點(diǎn)
-
解耦 :假設(shè)生產(chǎn)者和消費(fèi)者分別是兩個類。如果讓生產(chǎn)者直接調(diào)用消費(fèi)者的某個方法,那么生產(chǎn)者對于消費(fèi)者就會產(chǎn)生依賴(也就是耦合)。將來如果消費(fèi)者的代碼發(fā)生變化,可能會影響到生產(chǎn)者。而如果兩者都依賴于某個緩沖區(qū),兩者之間不直接依賴,耦合也就相應(yīng)降低了。
-
支持并發(fā) :生產(chǎn)者如果直接調(diào)用消費(fèi)者的某個方法,還有另一個弊端就是由于函數(shù)調(diào)用是同步的(或者叫阻塞的),在消費(fèi)者的方法沒有返回之前,生產(chǎn)者只好一直等在那邊。萬一消費(fèi)者處理數(shù)據(jù)很慢,生產(chǎn)者就會白白浪費(fèi)時間。
使用了生產(chǎn)者-消費(fèi)者模式之后,生產(chǎn)者和消費(fèi)者可以是兩個獨(dú)立的并發(fā)主體。生產(chǎn)者把制造出來的數(shù)據(jù)往緩沖區(qū)一丟,就可以再去生產(chǎn)下一個數(shù)據(jù)。基本上不用依賴消費(fèi)者的處理速度。 -
支持忙閑不均:緩沖區(qū)還有另一個好處。如果制造數(shù)據(jù)的速度時快時慢,緩沖區(qū)的好處就體現(xiàn)出來了。當(dāng)數(shù)據(jù)制造快的時候,消費(fèi)者來不及處理,未處理的數(shù)據(jù)可以暫時存在緩沖區(qū)中。等生產(chǎn)者的制造速度慢下來,消費(fèi)者再慢慢處理掉。
四、基于BlockQueue的生產(chǎn)者消費(fèi)者模型
在多線程編程中阻塞隊(duì)列(Block Queue)是一種常用于實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型的數(shù)據(jù)結(jié)構(gòu)。其與普通的隊(duì)列區(qū)別在于:
- 當(dāng)隊(duì)列為空時,從隊(duì)列獲取元素的操作將會被阻塞,直到隊(duì)列中被放入了元素;
- 當(dāng)隊(duì)列滿時,往隊(duì)列里存放元素的操作也會被阻塞,直到有元素被從隊(duì)列中取出。
聯(lián)系: 管道的實(shí)現(xiàn)其實(shí)就是依據(jù)阻塞隊(duì)列實(shí)現(xiàn)的!
1、C++實(shí)現(xiàn)阻塞隊(duì)列
// 阻塞隊(duì)列
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
// 容量的默認(rèn)值
const size_t g_cap = 5;
template<class T>
class BlockQueue
{
public:
BlockQueue(size_t cap = g_cap)
:_cap(cap)
{
// 對鎖和條件變量進(jìn)行初始化
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_consumerCond, nullptr);
pthread_cond_init(&_producerCond, nullptr);
}
// 插入數(shù)據(jù)
void push(const T& data)
{
// 加鎖
pthread_mutex_lock(&_mutex);
// 這里使用while能夠防止被誤喚醒(例如消費(fèi)者使用的是broadcast)
while (_q.size() == _cap)
{
// 不滿足生產(chǎn)條件,需要進(jìn)行等待
pthread_cond_wait(&_producerCond, &_mutex);
}
// 插入數(shù)據(jù)
_q.push(data);
// 喚醒隊(duì)列中的第一個消費(fèi)者
pthread_cond_signal(&_consumerCond);
pthread_mutex_unlock(&_mutex);
}
// 刪除數(shù)據(jù),out是一個輸出型數(shù)據(jù)表示刪除的值,如果不關(guān)心刪除的值可以傳nullptr
void pop(T* out)
{
// 加鎖
pthread_mutex_lock(&_mutex);
// 這里使用while能夠防止被誤喚醒(例如生成者使用的是broadcast)
while (_q.empty())
{
// 不滿足消費(fèi)條件,需要進(jìn)行等待
pthread_cond_wait(&_consumerCond, &_mutex);
}
if (out != nullptr)
{
*out = _q.front();
}
// 刪除數(shù)據(jù)
_q.pop();
// 喚醒隊(duì)列中的第一個生成者
pthread_cond_signal(&_producerCond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
// 對鎖和條件變量進(jìn)行銷毀
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumerCond);
pthread_cond_destroy(&_producerCond);
}
private:
// 隊(duì)列
std::queue<T> _q;
// 容量
size_t _cap;
// 互斥鎖
pthread_mutex_t _mutex;
// 生產(chǎn)者條件變量
pthread_cond_t _consumerCond;
// 消費(fèi)者條件變量
pthread_cond_t _producerCond;
};
2、一些注意事項(xiàng)
判斷是否滿足生產(chǎn)消費(fèi)條件時不能用if,而應(yīng)該用while:
-
pthread_cond_wait
函數(shù)是讓當(dāng)前執(zhí)行流進(jìn)行等待的函數(shù),是函數(shù)就意味著有可能調(diào)用失敗,調(diào)用失敗后該執(zhí)行流就會繼續(xù)往后執(zhí)行,有可能會導(dǎo)致產(chǎn)生的數(shù)據(jù)大于容量上限,或者隊(duì)列為空還在消費(fèi)。 -
其次,在多消費(fèi)者的情況下,當(dāng)生產(chǎn)者生產(chǎn)了一個數(shù)據(jù)后如果使用
pthread_cond_broadcast
函數(shù)喚醒消費(fèi)者,就會一次性喚醒多個消費(fèi)者,但待消費(fèi)的數(shù)據(jù)只有一個,此時其他消費(fèi)者就被偽喚醒了。
為了避免出現(xiàn)上述情況,我們就要讓線程被喚醒后再次進(jìn)行判斷,確認(rèn)是否真的滿足生產(chǎn)消費(fèi)條件,因此這里必須要用while進(jìn)行判斷。
為了測試這份代碼,我們先讓生產(chǎn)者生成的慢一些,讓消費(fèi)者消費(fèi)的快一些,我們應(yīng)該看到:生成者生成一個,消費(fèi)者消費(fèi)一個。
#include "BlockQueue.hpp"
#include <cstdlib>
#include <unistd.h>
using namespace std;
void* producer(void* args)
{
BlockQueue<int>* pbq = static_cast<BlockQueue<int>*> (args);
srand((unsigned int)time(nullptr));
while (true)
{
// 讓生產(chǎn)者生產(chǎn)的慢一些
sleep(1);
// 1.獲取數(shù)據(jù)
int data = rand() % 100;
// 2.將數(shù)據(jù)交給阻塞隊(duì)列
pbq->push(data);
cout << "A data is generated, it is :" << data << endl;
}
return nullptr;
}
void* consumer(void* args)
{
BlockQueue<int>* pbq = static_cast<BlockQueue<int>*> (args);
while (true)
{
//sleep(1); // 讓消費(fèi)者消費(fèi)的慢一些
// 1.從阻塞隊(duì)列中獲取數(shù)據(jù)
int out;
pbq->pop(&out);
// 2.處理數(shù)據(jù)
cout << "A data is consumed, it is :" << out << endl;
}
return nullptr;
}
int main()
{
pthread_t tp, tc;
BlockQueue<int> bq;
pthread_create(&tp, nullptr, producer, &bq);
pthread_create(&tc, nullptr, consumer, &bq);
pthread_join(tp, nullptr);
pthread_join(tc, nullptr);
return 0;
}
可以看到結(jié)果符合我們的預(yù)期:
接下來我們讓生成者生成快一些,讓消費(fèi)者消費(fèi)慢一些,我們應(yīng)該看到生產(chǎn)者先把阻塞隊(duì)列塞滿,然后消費(fèi)者消費(fèi)一個,生產(chǎn)者生成一個。
(將上述代碼的void* producer(void* args)
函數(shù)中的sleep
注釋掉,將void* consumer(void* args)
函數(shù)中的sleep
注釋去掉)
五、基于環(huán)形隊(duì)列的生產(chǎn)者消費(fèi)者模型
1、信號量的原理
-
我們知道一把互斥鎖只能對一份臨界資源進(jìn)行保護(hù),當(dāng)我們對加鎖的資源使用時相當(dāng)于將這塊臨界資源看作一個整體,同一時刻只允許一個執(zhí)行流對這塊臨界資源進(jìn)行訪問。
-
但實(shí)際我們可以將這塊臨界資源再分割為多個區(qū)域,當(dāng)多個執(zhí)行流需要訪問臨界資源時,如果這些執(zhí)行流訪問的是臨界資源的不同區(qū)域,那么我們可以讓這些執(zhí)行流進(jìn)行同時訪問,此時并不會出現(xiàn)數(shù)據(jù)不一致等問題。
-
信號量(信號燈)本質(zhì)是一個計(jì)數(shù)器,是描述臨界資源中資源數(shù)目的計(jì)數(shù)器,信號量能夠更細(xì)粒度的對臨界資源進(jìn)行管理。
-
每個執(zhí)行流在進(jìn)入臨界區(qū)之前都應(yīng)該先申請信號量,申請成功就有了操作特點(diǎn)的臨界資源的權(quán)限,當(dāng)操作完畢后就應(yīng)該釋放信號量。
-
信號量的工作機(jī)制類似于我們看電影買票,是一種資源的預(yù)訂機(jī)制!不管線程對這份資源是用還是不用,這份資源一定是有的!如果申請信號量失敗,則線程會被掛起等待,直到有資源可以使用才會自動被喚醒。
-
如果將信號量的初始值設(shè)置為1,那么此時該信號量叫做二元信號量,說明信號量所描述的臨界資源只有一份,此時信號量的作用基本等價于互斥鎖。
信號量的PV操作:
P操作:我們將申請信號量稱為P操作,申請信號量的本質(zhì)就是申請獲得臨界資源中某塊資源的使用權(quán)限,當(dāng)申請成功時臨界資源中資源的數(shù)目應(yīng)該減一,因此P操作的本質(zhì)就是讓計(jì)數(shù)器減一。
V操作:我們將釋放信號量稱為V操作,釋放信號量的本質(zhì)就是歸還臨界資源中某塊資源的使用權(quán)限,當(dāng)釋放成功時臨界資源中資源的數(shù)目就應(yīng)該加一,因此V操作的本質(zhì)就是讓計(jì)數(shù)器加一。
此外,PV操作是原子操作,只有這樣才能保證信號量的線程安全
2、POSIX信號量
POSIX
信號量和SystemV
信號量作用相同,都是用于同步操作,達(dá)到無沖突的訪問共享資源目的。 但POSIX
可以用于線程間同步。
使用下面的函數(shù)需要包含頭文件 : #include <semaphore.h>
,并鏈接庫-lpthread
初始化信號量函數(shù):
int sem_init(sem_t *sem, int pshared, unsigned int value);
參數(shù)說明:
-
sem
:需要初始化的信號量。 -
pshared
:傳入0值表示線程間共享,傳入非零值表示進(jìn)程間共享。 -
value
:計(jì)數(shù)器的初始值。
返回值說明:
- 初始化信號量成功返回0,失敗返回-1。
銷毀信號量
int sem_destroy(sem_t *sem);
參數(shù)說明:
-
sem
:需要銷毀的信號量。
返回值說明:
- 銷毀信號量成功返回0,失敗返回-1。
等待信號量
功能:等待信號量,會將信號量的值減1
int sem_wait(sem_t *sem);
參數(shù)說明:
-
sem
:需要等待的信號量。
返回值說明:
- 等待信號量成功返回0,信號量的值減1。
- 等待信號量失敗返回-1,信號量的值保持不變。
發(fā)布信號量
int sem_post(sem_t *sem);
參數(shù)說明:
-
sem
:需要發(fā)布的信號量。
返回值說明:
- 發(fā)布信號量成功返回0,信號量的值加一。
- 發(fā)布信號量失敗返回-1,信號量的值保持不變。
3、基于環(huán)形隊(duì)列的生產(chǎn)消費(fèi)模型
在阻塞隊(duì)列中,我們將隊(duì)列作為整體使用,生產(chǎn)者和消費(fèi)者在同一時刻只能有一個人進(jìn)行訪問,但是在環(huán)形隊(duì)列里面我們可以發(fā)現(xiàn),生產(chǎn)者和消費(fèi)者關(guān)心的內(nèi)容是不一樣的!
- 生產(chǎn)者關(guān)心空間,消費(fèi)者關(guān)心的是數(shù)據(jù),環(huán)形隊(duì)列只要生產(chǎn)者和消費(fèi)者訪問不同的區(qū)域,生產(chǎn)和消費(fèi)行為可以同時并發(fā)進(jìn)行。
那么它們什么時候會訪問同一塊區(qū)域呢?
-
剛開始時,數(shù)據(jù)為空,空間為滿,生產(chǎn)者和消費(fèi)者指向同一個位置,存在競爭關(guān)系,這時我們應(yīng)該讓生產(chǎn)者先運(yùn)行!
-
當(dāng)數(shù)據(jù)為滿,空間為空,生產(chǎn)者和消費(fèi)者指向同一個位置,存在競爭關(guān)系,這時我們應(yīng)該讓消費(fèi)者先運(yùn)行!
4、代碼實(shí)現(xiàn)
#include <vector>
#include <semaphore.h>
#include <pthread.h>
template<class T>
class RingQueue
{
public:
// 構(gòu)造函數(shù)設(shè)置默認(rèn)環(huán)形隊(duì)列的大小是5
RingQueue(int cap = 5)
:_cap(cap), _ring(cap),_c_step(0), _p_step(0)
{
// 剛開始時,數(shù)據(jù)信號量為0
sem_init(&_data_sem, 0, 0);
// 剛開始時,空間信號量為cap
sem_init(&_space_sem, 0, cap);
pthread_mutex_init(&_c_step_mtx, nullptr);
pthread_mutex_init(&_p_step_mtx, nullptr);
}
// 插入數(shù)據(jù)
void push(const T& data)
{
// 生產(chǎn)者申請空間資源,P操作
sem_wait(&_space_sem);
// 信號量申請成功,必定有資源可以使用,具體是哪一個資源由程序員分配資源
//_p_step是臨界資源
pthread_mutex_lock(&_p_step_mtx);
_ring[_p_step++] = data;
// 防止越界
_p_step %= _cap;
pthread_mutex_unlock(&_p_step_mtx);
// 釋放對方關(guān)心的信號量,增加了一個數(shù)據(jù),V操作
sem_post(&_data_sem);
}
void pop(T* out)
{
// 消費(fèi)者者申請數(shù)據(jù)資源,P操作
sem_wait(&_data_sem);
pthread_mutex_lock(&_c_step_mtx);
*out = _ring[_c_step++];
_c_step %= _cap;
pthread_mutex_unlock(&_c_step_mtx);
// 釋放對方關(guān)心的信號量,增加了一個空間,V操作
sem_post(&_space_sem);
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
}
private:
int _cap; // 容量
std::vector<T> _ring; // 容器
sem_t _data_sem; // 數(shù)據(jù)信號量
sem_t _space_sem; // 空間信號量
size_t _c_step; // 消費(fèi)者位置
size_t _p_step; // 生產(chǎn)者位置
pthread_mutex_t _c_step_mtx; // _c_step對應(yīng)的鎖
pthread_mutex_t _p_step_mtx; // _p_step對應(yīng)的鎖
};
相關(guān)說明:
-
生產(chǎn)者 / 消費(fèi)者每次生產(chǎn)數(shù)據(jù)后
_p_step
/_c_step
都會進(jìn)行++,標(biāo)記下一次生產(chǎn)/消費(fèi)數(shù)據(jù)的存放位置,++后的下標(biāo)會與環(huán)形隊(duì)列的容量進(jìn)行取模運(yùn)算,實(shí)現(xiàn)“環(huán)形”的效果。文章來源:http://www.zghlxwxcb.cn/news/detail-739135.html -
盡管我們已經(jīng)通過信號量保證了生產(chǎn)者和消費(fèi)者大部分情況下在該環(huán)形隊(duì)列可以讓生產(chǎn)者和消費(fèi)者并發(fā)的執(zhí)行,但是由于生產(chǎn)者和生產(chǎn)者,消費(fèi)者和消費(fèi)者存在競爭關(guān)系,所以我們還需要兩把鎖,
_c_step_mtx
,_p_step_mtx
來保證它們之間的競爭關(guān)系。文章來源地址http://www.zghlxwxcb.cn/news/detail-739135.html
到了這里,關(guān)于【Linux】深入理解生產(chǎn)者消費(fèi)者模型的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!