生產(chǎn)者消費(fèi)者模型
123規(guī)則
1個(gè)線程安全的隊(duì)列:只要保證先進(jìn)先出特性的數(shù)據(jù)結(jié)構(gòu)都可以稱為隊(duì)列
這個(gè)隊(duì)列要保證互斥(就是保證當(dāng)前只有一個(gè)線程對(duì)隊(duì)列進(jìn)行操作,其他線程不可以同時(shí)來操作),還要保證同步,當(dāng)生產(chǎn)者將隊(duì)列中填充滿了之后要通知消費(fèi)者來進(jìn)行消費(fèi),消費(fèi)者消費(fèi)之后通知生產(chǎn)者來進(jìn)行生產(chǎn)。
隊(duì)列起到了生產(chǎn)者和消費(fèi)者的緩沖作用,生產(chǎn)者不用因?yàn)闆]有人消費(fèi)發(fā)愁,只需要將生產(chǎn)的數(shù)據(jù)放到隊(duì)列中即可;消費(fèi)者不用因?yàn)樯a(chǎn)者生產(chǎn)了大量數(shù)據(jù)而發(fā)愁,只需要正常關(guān)注正在處理的數(shù)據(jù)即可
2個(gè)角色的線程:生產(chǎn)者和消費(fèi)者
3個(gè)規(guī)則:生產(chǎn)者和生產(chǎn)者互斥、消費(fèi)者和消費(fèi)者互斥、生產(chǎn)者和消費(fèi)者互斥+同步
應(yīng)用場(chǎng)景
比如說微信的后臺(tái)程序:在不同的場(chǎng)景下一個(gè)進(jìn)程可以是消費(fèi)者也可以是生產(chǎn)者
優(yōu)點(diǎn)
忙閑不均
在同一時(shí)刻可能接收消息的線程不忙而處理消息的線程一直處于工作狀態(tài)
生產(chǎn)者和消費(fèi)者解耦
生產(chǎn)者只關(guān)心生產(chǎn),關(guān)心隊(duì)列是否有空閑空間;
消費(fèi)者只關(guān)心消費(fèi),關(guān)心隊(duì)列中是否有數(shù)據(jù)可用。
生產(chǎn)者和消費(fèi)者不是串行的執(zhí)行(串行的處理就是當(dāng)一個(gè)線程接收到消息后才可以處理消息,并且只有處理完了之后才可以發(fā)送消息,是一個(gè)串行的過程),而生產(chǎn)者消費(fèi)者模型將生產(chǎn)者和消費(fèi)者解耦,接收消息的一輩子就接收消息,處理消息的一輩子就處理消息,發(fā)送消息一輩子就只發(fā)送消息,不受其他線程的影響
支持高并發(fā)
同一時(shí)刻多個(gè)人發(fā)送消息這種情況是支持的,因?yàn)榻邮障⒌木€程只需要接收消息,不用干其他事情,所以接收線程接收消息的速度很快
代碼模擬
采用互斥和同步實(shí)現(xiàn):
#include<stdio.h>
#include<iostream>
#include<queue>
#include<unistd.h>
#include<pthread.h>
using namespace std;
#define THREAD_COUNT 1//生產(chǎn)者和消費(fèi)者數(shù)量
//創(chuàng)建線程安全隊(duì)列
class RingQueue{
public:
RingQueue(){
capacity = 1;
pthread_mutex_init(&que_lock, NULL);
pthread_cond_init(&consum_cond, NULL);
pthread_cond_init(&product_cond, NULL);
}
~RingQueue(){
pthread_mutex_destroy(&que_lock);
pthread_cond_destroy(&consum_cond);
pthread_cond_destroy(&product_cond);
}
//往隊(duì)列中放數(shù)據(jù),生產(chǎn)
void Push(int data){
pthread_mutex_lock(&que_lock);
while(que.size()>=capacity){
pthread_cond_wait(&product_cond, &que_lock);
//為什么要用while循環(huán)呢?
//因?yàn)楫?dāng)生產(chǎn)者被喚醒后,需要再次判斷隊(duì)列是否可以滿足生產(chǎn)的條件
//生產(chǎn)者或者消費(fèi)者都是需要在等待結(jié)束后再次判斷的
}
que.push(data);//生產(chǎn),往隊(duì)列中放入數(shù)據(jù)
cout<<"I am product: " << pthread_self() << "I product number is " << data << endl;
pthread_mutex_unlock(&que_lock);
pthread_cond_signal(&consum_cond);
//生產(chǎn)者完成生產(chǎn)后喚醒消費(fèi)者線程讓消費(fèi)者進(jìn)行消費(fèi)
}
//從隊(duì)列中取數(shù)據(jù),消費(fèi)
int Pop(){
pthread_mutex_lock(&que_lock);
while(que.size() <= 0){
pthread_cond_wait(&consum_cond, &que_lock);
}
int data = que.front();
que.pop();
cout<<"I am consume: " << pthread_self() << "I consume number is " << data << endl;
pthread_mutex_unlock(&que_lock);
pthread_cond_signal(&product_cond);//消費(fèi)者線程消費(fèi)之后通知生產(chǎn)者來生產(chǎn)
return data;
}
private:
queue<int> que;//線程安全的隊(duì)列
//給隊(duì)列一把鎖,保證互斥,保證同一時(shí)刻只有一個(gè)線程對(duì)隊(duì)列進(jìn)行操作
pthread_mutex_t que_lock;
//同步的條件變量,隊(duì)列有元素,消息,沒有元素等待,喚醒生產(chǎn)者
//保證生產(chǎn)者在隊(duì)列中沒有元素的時(shí)候進(jìn)行生產(chǎn)(插入元素)
pthread_cond_t consum_cond;
pthread_cond_t product_cond;
int capacity;//隊(duì)列容量,隊(duì)列元素大于容量表示隊(duì)滿,不再往里插入元素
};
int g_val = 0;
pthread_mutex_t g_val_lock = PTHREAD_MUTEX_INITIALIZER;//靜態(tài)初始化保護(hù)g_val的互斥鎖
void* product_thread_start(void* arg){
RingQueue *q = (RingQueue*)arg;
while(1){
pthread_mutex_lock(&g_val_lock);//獲取g_val的互斥鎖
q->Push(g_val);
g_val++;
sleep(1);
pthread_mutex_unlock(&g_val_lock);
}
}
void* consum_thread_start(void* arg){
RingQueue *q = (RingQueue*)arg;
while(1){
q->Pop();
}
}
int main(){
pthread_t consum_tid[THREAD_COUNT];
pthread_t product_tid[THREAD_COUNT];
RingQueue* q = new RingQueue();
for(int i=0; i<THREAD_COUNT; ++i){
int ret = pthread_create(&consum_tid[i], NULL, consum_thread_start, (void*)q);
if(ret < 0){
perror("pthread_create");
return 0;
}
ret = pthread_create(&product_tid[i], NULL, product_thread_start, (void*)q);
if(ret < 0){
perror("pthread_create");
return 0;
}
}
for(int i=0; i<THREAD_COUNT; ++i){
pthread_join(consum_tid[i], NULL);
pthread_join(product_tid[i], NULL);
}
delete q;
return 0;
}
執(zhí)行結(jié)果:
文章來源:http://www.zghlxwxcb.cn/news/detail-691789.html
可以看到有效的控制了生產(chǎn)者和消費(fèi)者的消費(fèi)順序,當(dāng)生產(chǎn)者生產(chǎn)一個(gè)消費(fèi)者就消費(fèi)一個(gè),消費(fèi)者消費(fèi)后生產(chǎn)者接著生產(chǎn)文章來源地址http://www.zghlxwxcb.cn/news/detail-691789.html
到了這里,關(guān)于【Linux】線程安全-生產(chǎn)者消費(fèi)者模型的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!