一、了解生產(chǎn)者消費(fèi)者模型
舉個(gè)例子:學(xué)生要買東西,一般情況下都會(huì)直接聯(lián)系廠商,因?yàn)橘I的商品不多,對(duì)于供貨商來說交易成本太高,所以有了交易場(chǎng)所超市這個(gè)媒介的存在。目的就是為了集中需求,分發(fā)產(chǎn)品。
消費(fèi)者與生產(chǎn)者之間通過了超市進(jìn)行交易。當(dāng)生產(chǎn)者不需要的時(shí)候,廠商可以繼續(xù)生產(chǎn),當(dāng)廠商不再生產(chǎn)的時(shí)候消費(fèi)者購買商品!
上述生產(chǎn)的過程和消費(fèi)的過程互相影響的程度很低——解耦
臨時(shí)的保存產(chǎn)品的場(chǎng)所——緩沖區(qū)
函數(shù)調(diào)用:main函數(shù)通過用戶輸入生產(chǎn)了數(shù)據(jù),用變量保存了數(shù)據(jù),要調(diào)用的函數(shù)消費(fèi)了數(shù)據(jù),當(dāng)main函數(shù)調(diào)用func函數(shù),main函數(shù)就會(huì)阻塞等待func函數(shù)返回,這種情況稱為強(qiáng)耦合關(guān)系。
利用生產(chǎn)者消費(fèi)者模式可以解決強(qiáng)耦合問題,將串行調(diào)用改為并行執(zhí)行,提高執(zhí)行效率,完成邏輯的解耦。
二、生產(chǎn)者與消費(fèi)者模型的幾種關(guān)系及特點(diǎn)
對(duì)消費(fèi)者與生產(chǎn)者模型,可以用以下321原則說明
-
三種關(guān)系:生產(chǎn)者和生產(chǎn)者(互斥),消費(fèi)者和消費(fèi)者(互斥),生產(chǎn)者和消費(fèi)者(互斥&&同步),互斥保證共享資源的安全性,同步是為了提高訪問效率
-
二種角色:生產(chǎn)者線程,消費(fèi)者線程
-
一個(gè)交易場(chǎng)所:一段特定結(jié)構(gòu)的緩沖區(qū)
生產(chǎn)消費(fèi)模型的特點(diǎn)
-
未來生產(chǎn)線程和消費(fèi)線程進(jìn)行解耦
-
支持生產(chǎn)和消費(fèi)的一段時(shí)間的忙閑不均的問題(緩存區(qū)有數(shù)據(jù)有空間)
-
生產(chǎn)者專注生產(chǎn),消費(fèi)專注消費(fèi),提高效率
如果緩沖區(qū)滿了,生產(chǎn)者只能進(jìn)行等待,如果超市緩沖區(qū)為空,消費(fèi)者只能進(jìn)行等待。
三、BlockQueue(阻塞隊(duì)列)
3.1 基礎(chǔ)版阻塞隊(duì)列
阻塞隊(duì)列:阻塞隊(duì)列(Blocking Queue)是一種常用于實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型的數(shù)據(jù)結(jié)構(gòu)
阻塞隊(duì)列為空時(shí),從阻塞隊(duì)列中獲取元素的線程將被阻塞,直到阻塞隊(duì)列被放入元素。
阻塞隊(duì)列已滿時(shí),往阻塞隊(duì)列放入元素的線程將被阻塞,直到有元素被取出。
單生產(chǎn)單消費(fèi)測(cè)試
//BlockQueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
using namespace std;
const int gmaxcap=5;
template<class T>
class BlockQueue
{
private:
std::queue<T> q_;
int maxcap_;//隊(duì)列容量
pthread_mutex_t mutex_;
pthread_cond_t pcond_;//生產(chǎn)者對(duì)應(yīng)的條件變量
pthread_cond_t ccond_;//消費(fèi)者者對(duì)應(yīng)的條件變量
public:
BlockQueue(const int& maxcap=gmaxcap)
:maxcap_(maxcap)
{
pthread_mutex_init(&mutex_,nullptr);
pthread_cond_init(&pcond_,nullptr);
pthread_cond_init(&ccond_,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&pcond_);
pthread_cond_destroy(&ccond_);
}
void push(const T &in)//輸入性參數(shù) &
{
pthread_mutex_lock(&mutex_);
//1.判斷
//細(xì)節(jié)2:充當(dāng)條件變量的語法必須是while,不能是if
while(is_full())
{
//細(xì)節(jié)1:該函數(shù)會(huì)以原子性的方式將鎖釋放,并將自己掛起
//被喚醒的時(shí)候會(huì)自動(dòng)獲取傳入的鎖
pthread_cond_wait(&pcond_,&mutex_);//緩沖區(qū)滿,生產(chǎn)者阻塞等待
}
//2.這一步一定沒有滿
q_.push(in);
//3.堵塞隊(duì)列一定有數(shù)據(jù)
//細(xì)節(jié)3:?jiǎn)拘研袨榭梢苑旁诮怄i前也可以放在解鎖后
pthread_cond_signal(&ccond_);//喚醒消費(fèi)者
pthread_mutex_unlock(&mutex_);
//pthread_cond_signal(&ccond_);//喚醒消費(fèi)者
}
void pop(T* out)//輸出型參數(shù):*
{
pthread_mutex_lock(&mutex_);
//1.判斷
while(is_empty())
{
pthread_cond_wait(&ccond_,&mutex_);//緩沖區(qū)空,消費(fèi)者阻塞等待
}
//2.這一步一定沒有滿
*out = q_.front();
q_.pop();
//3.堵塞隊(duì)列一定沒有滿
pthread_cond_signal(&pcond_);//喚醒生產(chǎn)者
pthread_mutex_unlock(&mutex_);
}
private:
bool is_empty()
{
return q_.empty();
}
bool is_full()
{
return q_.size()==maxcap_;
}
};
//Main.cc
#include "BlockQueue.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
void* productor(void* bq_)//生產(chǎn)
{
BlockQueue<int> * bq=static_cast<BlockQueue<int>*>(bq_);
while (true)
{
int data=rand()%10+1;
bq->push(data);
std::cout<<"生產(chǎn)數(shù)據(jù): "<<data<<std::endl;
}
return nullptr;
}
void* consumer(void* bq_)//消費(fèi)
{
BlockQueue<int> * bq=static_cast<BlockQueue<int>*>(bq_);
while (true)
{
int data;
bq->pop(&data);
std::cout<<"消費(fèi)數(shù)據(jù): "<<data<<std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());
BlockQueue<int> * bq=new BlockQueue<int>();
pthread_t c,p;
pthread_create(&c,nullptr,consumer,bq);
pthread_create(&p,nullptr,productor,bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
delete bq;
return 0;
}
控制生產(chǎn)速度,即每間隔1s生產(chǎn)一次,生產(chǎn)一個(gè)消費(fèi)一個(gè),而且消費(fèi)的都是最新的數(shù)據(jù)
控制消費(fèi)速度,即每間隔1s消費(fèi)一次,剛開始生產(chǎn)多個(gè),穩(wěn)定后生產(chǎn)一個(gè)消費(fèi)一個(gè),消費(fèi)的是以前的數(shù)據(jù)
以上代碼的三個(gè)細(xì)節(jié)
- 細(xì)節(jié)一:
pthread_cond_wait(&pcond_,&mutex_);
第二個(gè)參數(shù)是鎖,該函數(shù)調(diào)用會(huì)以原子性的方式將鎖釋放,并將自己掛起;被喚醒的時(shí)候會(huì)自動(dòng)獲取傳入的鎖- 細(xì)節(jié)二:判斷空和滿的時(shí)候要用while,存在多個(gè)生產(chǎn)者因滿掛起后,消費(fèi)者使用一個(gè)后,同時(shí)喚醒所有生產(chǎn)者,導(dǎo)致數(shù)據(jù)多增加;
pthread_cond_signal偽喚醒:假設(shè)生產(chǎn)者有5個(gè),消費(fèi)者只有一個(gè),消費(fèi)一下數(shù)據(jù),如果是pthread_cond_broadcast(),把5個(gè)線程同時(shí)喚醒,可是只需要生產(chǎn)一個(gè)數(shù)據(jù),而同時(shí)把5個(gè)線程喚醒,是if判斷的時(shí)候就會(huì)多次push溢出問題
前提條件:每個(gè)生產(chǎn)者進(jìn)入函數(shù),拿到鎖資源后,判斷為假,調(diào)用wait(),釋放鎖資源讓其他線程進(jìn)來,到最后五個(gè)生產(chǎn)者者同時(shí)處于wait狀態(tài),消費(fèi)者將其全部喚醒,5個(gè)生產(chǎn)者挨個(gè)重新獲取鎖資源接著執(zhí)行代碼,就會(huì)導(dǎo)致上述問題- 細(xì)節(jié)三:?jiǎn)拘研袨榭梢苑旁诮怄i前也可以放在解鎖后。解鎖前喚醒:?jiǎn)拘阎竽硞€(gè)生產(chǎn)者得到鎖的優(yōu)先級(jí)高,消費(fèi)者釋放,生產(chǎn)者立馬拿到;解鎖后喚醒:隨機(jī)被某個(gè)消費(fèi)者拿走鎖,不影響
3.2 基于任務(wù)版的阻塞隊(duì)列
基于上述代碼,新建一個(gè)Task.hpp,用來給線程派發(fā)任務(wù)執(zhí)行任務(wù)
BlockQueue.hpp如上
/*****************/
#pragma once
#include <iostream>
#include <cstdio>
#include <functional>
class Task
{
public:
using func_t =std::function<int(int,int,char)>;
Task(){}
Task(int x,int y,char op,func_t callback)
:x_(x),y_(y),op_(op),callback_(callback)
{
}
std::string operator()()
{
int result=callback_(x_,y_,op_);
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d=%d",x_,op_,y_,result);
return buffer;
}
std::string toTaskString()
{
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d=?",x_,op_,y_);
return buffer;
}
private:
int x_;
int y_;
char op_;
func_t callback_;
};
/**************************/
#include "BlockQueue.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "Task.hpp"
const std::string oper = "+-*/%";
int mymath(int x,int y,char op)
{
int result = 0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
std::cerr << "div zero error!" << std::endl;
result = -1;
}
else
result = x / y;
}
break;
case '%':
{
if (y == 0)
{
std::cerr << "mod zero error!" << std::endl;
result = -1;
}
else
result = x % y;
}
break;
default:
break;
}
return result;
}
void* productor(void* bq_)//生產(chǎn)
{
BlockQueue<Task> * bq=static_cast<BlockQueue<Task>*>(bq_);
while (true)
{
int x=rand()%100+1;
int y=rand()%10;
int operCode=rand() % oper.size();
Task t(x,y,oper[operCode],mymath);
bq->push(t);
std::cout<<"生產(chǎn)任務(wù): "<<t.toTaskString()<<std::endl;
sleep(1);
}
return nullptr;
}
void* consumer(void* bq_)//消費(fèi)
{
BlockQueue<Task> * bq=static_cast<BlockQueue<Task>*>(bq_);
while (true)
{
Task t;
bq->pop(&t);
std::cout<<"消費(fèi)任務(wù):"<<t()<<std::endl;
//sleep(1);
}
return nullptr;
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());
BlockQueue<Task> * bq=new BlockQueue<Task>();
pthread_t c,p;
pthread_create(&c,nullptr,consumer,bq);
pthread_create(&p,nullptr,productor,bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
delete bq;
return 0;
}
3.3 進(jìn)階版生產(chǎn)消費(fèi)模型–生產(chǎn)、消費(fèi)、保存
任務(wù)目標(biāo):
- 生產(chǎn)者(線程1)生產(chǎn)任務(wù)加入到計(jì)算任務(wù)隊(duì)列中
- 消費(fèi)者&生產(chǎn)者(線程2)消費(fèi)計(jì)算隊(duì)列中任務(wù)并將計(jì)算結(jié)果推送到存儲(chǔ)任務(wù)隊(duì)列中
- 消費(fèi)者(線程3)消費(fèi)存儲(chǔ)任務(wù)隊(duì)列,將結(jié)果保存到文件中
設(shè)計(jì)思路
生產(chǎn)者productor將計(jì)算任務(wù)CalTask,push到計(jì)算隊(duì)列中
消費(fèi)者&生產(chǎn)者consumer獲取計(jì)算任務(wù)CalTask,并將計(jì)算任務(wù)結(jié)果結(jié)合Save方法構(gòu)造一個(gè)SaveTask對(duì)象,然后將這個(gè)對(duì)象push到存儲(chǔ)隊(duì)列中
消費(fèi)者saver拿到存儲(chǔ)任務(wù),通過回調(diào)函數(shù)將數(shù)據(jù)寫進(jìn)文件中
代碼實(shí)現(xiàn)如下:
/*Main.cc*/
#include "BlockQueue.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "Task.hpp"
//定義一個(gè)隊(duì)列保存計(jì)算任務(wù)隊(duì)列和保存任務(wù)隊(duì)列
template<class C,class S>
class TwoBlockQueue
{
public:
BlockQueue<C> * c_bq;
BlockQueue<S> * s_bq;
};
void* productor(void* bqs)//生產(chǎn)
{
BlockQueue<CalTask> * bq=(static_cast<TwoBlockQueue<CalTask,SaveTask>*>(bqs))->c_bq;
while (true)
{
int x=rand()%100+1;
int y=rand()%10;
int operCode=rand() % oper.size();
CalTask t(x,y,oper[operCode],mymath);
bq->push(t);
std::cout<<"productor->生產(chǎn)任務(wù): "<<t.toTaskString()<<std::endl;
sleep(1);
}
return nullptr;
}
void* consumer(void* bqs)//消費(fèi)
{
//拿到計(jì)算隊(duì)列
BlockQueue<CalTask> * bq=(static_cast<TwoBlockQueue<CalTask,SaveTask>*>(bqs))->c_bq;
//拿到保存隊(duì)列
BlockQueue<SaveTask> * save_bq=(static_cast<TwoBlockQueue<CalTask,SaveTask>*>(bqs))->s_bq;
while (true)
{
//得到任務(wù)并處理
CalTask t;
bq->pop(&t);
string result=t();//
std::cout<<"consumer->消費(fèi)任務(wù):"<<result<<std::endl;
//存儲(chǔ)任務(wù)
SaveTask save(result,Save);
save_bq->push(save);
std::cout<<"consumer->推送保存任務(wù)完成..."<<std::endl;
//sleep(1);
}
return nullptr;
}
void* saver(void* bqs)
{
BlockQueue<SaveTask> * save_bq=(static_cast<TwoBlockQueue<CalTask,SaveTask>*>(bqs))->s_bq;
while (true)
{
SaveTask t;
save_bq->pop(&t);
t();
cout<<"saver->保存任務(wù)完成"<<endl;
}
return nullptr;
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());
TwoBlockQueue<CalTask,SaveTask> bqs;
bqs.c_bq=new BlockQueue<CalTask>();
bqs.s_bq=new BlockQueue<SaveTask>();
pthread_t c,p,s;
pthread_create(&c,nullptr,consumer,&bqs);
pthread_create(&p,nullptr,productor,&bqs);
pthread_create(&s,nullptr,saver,&bqs);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
pthread_join(s,nullptr);
delete bqs.c_bq;
delete bqs.s_bq;
return 0;
}
Task.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <cstring>
#include <functional>
class CalTask//計(jì)算任務(wù)
{
public:
using func_t =std::function<int(int,int,char)>;
CalTask(){}
CalTask(int x,int y,char op,func_t callback)
:x_(x),y_(y),op_(op),callback_(callback)
{
}
std::string operator()()
{
int result=callback_(x_,y_,op_);
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d=%d",x_,op_,y_,result);
return buffer;
}
std::string toTaskString()
{
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d=?",x_,op_,y_);
return buffer;
}
private:
int x_;
int y_;
char op_;
func_t callback_;
};
const std::string oper = "+-*/%";
int mymath(int x,int y,char op)
{
int result = 0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
std::cerr << "div zero error!" << std::endl;
result = -1;
}
else
result = x / y;
}
break;
case '%':
{
if (y == 0)
{
std::cerr << "mod zero error!" << std::endl;
result = -1;
}
else
result = x % y;
}
break;
default:
break;
}
return result;
}
class SaveTask
{
typedef std::function<void(const std::string&)> func_t;
public:
SaveTask(){}
SaveTask(const std::string& message,func_t func)
:message_(message),func_(func)
{
}
void operator()()
{
func_(message_);
}
private:
std::string message_;
func_t func_;
};
void Save(const std::string& message)
{
const std::string target="./log.txt";
FILE* fp=fopen(target.c_str(),"a+");
if(!fp)
{
std::cerr<<"fopen error"<<endl;
return;
}
fputs(message.c_str(),fp);
fputs("\n",fp);
fclose(fp);
}
四、小結(jié)
阻塞隊(duì)列也適用于多生產(chǎn)者多消費(fèi)
在阻塞隊(duì)列中,無論外部線程再多,真正進(jìn)入到阻塞隊(duì)列里生產(chǎn)或消費(fèi)的線程永遠(yuǎn)只有一個(gè)。
在一個(gè)任務(wù)隊(duì)列中,有多個(gè)生產(chǎn)者與多個(gè)消費(fèi)者,由于有鎖的存在,所以任意時(shí)刻只有一個(gè)執(zhí)行流在阻塞隊(duì)列里放或者取。
生產(chǎn)消費(fèi)模型高效體現(xiàn)在哪里
高效并不是體現(xiàn)在從隊(duì)列中消費(fèi)數(shù)據(jù)高效!
而是我們可以讓一個(gè)、多個(gè)線程并發(fā)的同時(shí)計(jì)算多個(gè)任務(wù)!在計(jì)算多個(gè)任務(wù)的同時(shí),并不影響其他線程繼續(xù)從隊(duì)列里拿任務(wù)的過程。
也就是說,生產(chǎn)者消費(fèi)者模型的高效:可以在生產(chǎn)之前與消費(fèi)之后讓線程并行執(zhí)行文章來源:http://www.zghlxwxcb.cn/news/detail-431270.html
生產(chǎn)任務(wù)需要花費(fèi)時(shí)間,不是把任務(wù)放進(jìn)隊(duì)列就完事了;消費(fèi)任務(wù)也是需要時(shí)間的,不是把任務(wù)從隊(duì)列中拿出來就完事了,還要處理它,處理它期間不影響其它線程消費(fèi),反之亦然,這才是生產(chǎn)者與消費(fèi)者模型的高效體現(xiàn)!?。?span toymoban-style="hidden">文章來源地址http://www.zghlxwxcb.cn/news/detail-431270.html
到了這里,關(guān)于Linux之【多線程】生產(chǎn)者與消費(fèi)者模型&BlockQueue(阻塞隊(duì)列)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!