喜歡的點(diǎn)贊,收藏,關(guān)注一下把!
1.線程同步
在線程互斥寫了一份搶票的代碼,我們發(fā)現(xiàn)雖然加鎖解決了搶到負(fù)數(shù)票的問題,但是一直都是一個(gè)線程在搶票,它錯(cuò)了嗎,它沒錯(cuò)但是不合理。那我們應(yīng)該如何安全合理的搶票呢?
講個(gè)小故事。
假設(shè)學(xué)校有一個(gè)VIP學(xué)霸自習(xí)室,這個(gè)自習(xí)室有一個(gè)特點(diǎn),里面的環(huán)境巨好,但是只有一張桌椅板凳,只允許一個(gè)人進(jìn)去學(xué)習(xí)。要去這個(gè)自習(xí)室學(xué)習(xí)學(xué)校規(guī)定必須是先到先得,這個(gè)自習(xí)室推出的時(shí)候在門口打了一根釘子掛了一把鑰匙,早上誰先來誰拿這把鑰匙進(jìn)去把門反鎖學(xué)習(xí),此時(shí)他就互斥式的訪問了這個(gè)自習(xí)室,你在自習(xí)室學(xué)習(xí)了兩小時(shí),你想上個(gè)廁所,此時(shí)你把自習(xí)室門打開,看到大家三五成群的議論著誰先來的,當(dāng)你出來時(shí)大家都看著你,你就想上個(gè)廁所馬上就回來,此時(shí)你把鑰匙放在口袋里上廁所去了,在你上廁所期間別人也進(jìn)不去這個(gè)自習(xí)室,就叫做什么,當(dāng)你這個(gè)線程執(zhí)行流被切走的時(shí)候你不是一個(gè)人走的,你是抱著鑰匙走了,然后你上完廁所回來把門一開進(jìn)去再反鎖又學(xué)了兩小時(shí),此時(shí)門口人越來越多,你不好意思了,你出來把鑰匙掛在墻上,你此時(shí)是距離鑰匙最近的,當(dāng)你正想離開時(shí)你想這么走是不是有點(diǎn)遺憾,好不容易起一大早。不行還得在學(xué)一會(huì),你又拿著鑰匙進(jìn)去又學(xué)了一小時(shí),這叫什么,這是你當(dāng)前離這個(gè)資源最近,競(jìng)爭(zhēng)能力最強(qiáng)你此時(shí)又拿到這個(gè)鑰匙又進(jìn)去反鎖學(xué)習(xí)了。然后你餓了想去吃飯,開門放鑰匙想著不知道一會(huì)什么時(shí)候在到你于是你又拿鑰匙進(jìn)去了,其他同學(xué)就看到這個(gè)人出來掛鑰匙又拿鑰匙再進(jìn)去,因?yàn)槟阕罱?,所以大家就看到一個(gè)人在瘋狂的開門放鑰匙拿鑰匙關(guān)門,這樣這間自習(xí)室就沒有創(chuàng)造價(jià)值!
此時(shí)我們把其他在門口的人長(zhǎng)時(shí)間得不到鎖資源而無法訪問公共資源的人,處于門口這些同學(xué)叫做處于饑餓狀態(tài)。一個(gè)線程頻繁申請(qǐng)鎖資源而導(dǎo)致其他線程得不到鎖資源這就叫做饑餓問題。
這個(gè)同學(xué)錯(cuò)了嗎?他沒錯(cuò),學(xué)校就是這樣規(guī)定的啊,所以他沒錯(cuò),但是不合理。學(xué)校經(jīng)過大量同學(xué)的反應(yīng)于是出了一個(gè)規(guī)定:1.所有在門口等待的同學(xué)必須排隊(duì),2.從自習(xí)室出來的同學(xué)不能立馬申請(qǐng)鎖必須先排到當(dāng)前隊(duì)列的尾部然后到你的時(shí)候才能拿鑰匙。
所以當(dāng)你拿到鑰匙進(jìn)去學(xué)習(xí)在出來把鑰匙掛墻上,你還想拿,老師馬上阻止你,干什么呢?去排隊(duì)去。所有同學(xué)都是這樣出來之后不能在拿鑰匙必須去排隊(duì)。此時(shí)就在數(shù)據(jù)安全的情況下,讓我們的同學(xué)按照一定的順序訪問鎖的資源。這就叫做線程同步!
線程同步的本質(zhì):當(dāng)我們?cè)谶M(jìn)行臨界資源安全訪問的前提下,讓多個(gè)線程按照一定的順序進(jìn)行資源訪問,這就是線程同步!
為了支持同步因此又有了條件變量!
在繼續(xù)談條件變量接口和使用之前,我們先談一談一個(gè)比較關(guān)鍵的概念。
生產(chǎn)者消費(fèi)者模型。生產(chǎn)消費(fèi)者模型談完我們?cè)谡剹l件變量,兩個(gè)概念談完我們把它們結(jié)合在一起最后寫一份基于Blcokqueue的生產(chǎn)者消費(fèi)者模型。
2.生產(chǎn)者消費(fèi)者模型
我們以生活中的例子來幫我們理解什么是生產(chǎn)消費(fèi)模型
我們?cè)谏钪薪?jīng)常要去超市購(gòu)買東西。
學(xué)生—消費(fèi)者
超市是生產(chǎn)者嗎?
咋一看好像是,但并不是,超市只是一個(gè)將東西聚集起來分發(fā)的交易場(chǎng)所,
各種商品的供貨商才是生產(chǎn)者
那如果你想買一根火腿腸,你為什么不能供貨商哪里去買呢?
你說供貨商你給我生產(chǎn)一根火腿腸我給你一塊錢你賣給我,供貨商是不會(huì)賣給你的,他啟動(dòng)機(jī)器生產(chǎn)都不值這個(gè)價(jià)。就靠這樣一人一次賣幾根似的賣貨,他虧死了,機(jī)器還要維護(hù)員工還要養(yǎng)。一般現(xiàn)實(shí)生活中這些工廠都比較距離人員較遠(yuǎn),而超市距離人員較近。這樣就由超市把各種需求結(jié)合在一起然后去找供貨商拿貨并且拿貨量比較大。所以超市就充當(dāng)了中間商。
現(xiàn)實(shí)生活供貨商把商品放到超市,消費(fèi)者再去超市銷售,二者間接通過超市進(jìn)行交易。
你今天去超市買火腿腸,那這個(gè)供貨商在干什么呢?
他可能在放假或者生產(chǎn)等等。
那供貨商在生產(chǎn),消費(fèi)者在干什么呢?
有可能吃火腿腸或者玩等等。
換句話說因?yàn)橛谐械拇嬖冢覀兊纳a(chǎn)過程和消費(fèi)過程就互相不怎么干擾了。
按計(jì)算機(jī)語(yǔ)言來說就是生產(chǎn)的過程和消費(fèi)的過程 — 解耦
超市是一個(gè)臨時(shí)保存產(chǎn)品的場(chǎng)所 — 緩沖區(qū)
正是因?yàn)橛芯彌_區(qū)的存在,生產(chǎn)出來的數(shù)據(jù)可以暫時(shí)放到超市里可以供消費(fèi)者一段時(shí)間的消費(fèi),同樣超市提供一大批展架可以供生產(chǎn)者生產(chǎn)。所以有了緩沖區(qū)可以讓生產(chǎn)消費(fèi)的步調(diào)并不怎么一致!
那有沒有寫代碼的時(shí)候,就沒有正兒八經(jīng)的解耦,也沒有提供緩存區(qū)的代碼呢?
有,函數(shù)調(diào)用!
調(diào)用方:生產(chǎn)了數(shù)據(jù)
形成變量:變量暫時(shí)保存數(shù)據(jù)
目標(biāo)函數(shù):消費(fèi)了數(shù)據(jù)
比如說main調(diào)用fun,main在調(diào)用fun的時(shí)候,main函數(shù)在干什么呢?
等待!什么都不做只能等fun函數(shù)調(diào)用結(jié)束返回。main和fun是強(qiáng)耦合的關(guān)系。
就比如不存在超市的情況,學(xué)生直接去找供貨商去買火腿腸,這個(gè)工廠沒有存貨只能開機(jī)器生產(chǎn),在生產(chǎn)的時(shí)候,學(xué)生只能等待!
下面我們結(jié)合上面的例子談一談生產(chǎn)消費(fèi)模式
超市能被學(xué)生去消費(fèi),也能被供貨商去生產(chǎn)。那超市是什么呢?
超市 — 共享資源
比如供貨商正在給超市架子上放火腿腸,學(xué)生就過來拿了。那能拿到嗎?
不一定!放火腿腸,要么就放,要么不放,不存在中間狀態(tài)!如果有中間狀態(tài)!那正在放時(shí),他當(dāng)前有沒有在放就直接決定了我能不能拿成功。所以他有沒有放是不確定的
所以生產(chǎn)和消費(fèi)在并發(fā)的訪問同一份資源的時(shí)候,就可能存在同時(shí)訪問的問題,就類似搶票的造成數(shù)據(jù)不一致問題。
生活中這種情況很少存在,但是當(dāng)兩個(gè)線程就有可能,比如一個(gè)線程正在放hello和world,另一個(gè)線程想拿world,但還沒有放完另一個(gè)線程就拿了,你怎么保證拿到的就是world?
下面我們就生產(chǎn)者和消費(fèi)者的關(guān)系來談一談
生產(chǎn)者和生產(chǎn)者之間什么關(guān)系呢?
生活中他倆是競(jìng)爭(zhēng)關(guān)系?。槭裁茨??超市貨架就那么多,放了你王中王就放不了雙匯
消費(fèi)者和消費(fèi)者之間什么關(guān)系呢?
生活中他倆是競(jìng)爭(zhēng)關(guān)系!比如世界末日超市就一根火腿腸,你倆不就是競(jìng)爭(zhēng)關(guān)系了
生產(chǎn)者和消費(fèi)者之間什么關(guān)系呢?
超市里有一個(gè)架子,生產(chǎn)者正在生產(chǎn)的時(shí)候你消費(fèi)者能拿嗎?答案是不能。就如剛才hello和world的問題。所以生產(chǎn)者和消費(fèi)者之間首先要保存數(shù)據(jù)安全!只允許一個(gè)人去訪問這份資源。還有比如你今天去買火腿腸但是超市火腿腸賣完了又被告知供貨商放假了,然后第二天又去超市問有沒有火腿腸超市還說沒有,這樣連續(xù)問了一個(gè)月,都告訴你沒有,那么你這是在干什么呢?當(dāng)前你在檢測(cè)這個(gè)火腿腸的就緒狀態(tài),但是你一直再問都沒有得到你想要的結(jié)果,第一這是浪費(fèi)你的時(shí)間,本來沒有貨你還來問,第二也浪費(fèi)了超市的時(shí)間,可能消費(fèi)者并不是只有你一個(gè)都來問。本來超市可以去找供貨商的。同理供貨商來問超市要不要火腿腸,超市說沒客人就不要你們就等等,供貨商就一直在詢問,同樣也在浪費(fèi)供貨商的時(shí)間。它們都沒錯(cuò),首先我們保證了資源的安全,只允許一個(gè)人去訪問,但是呢并不合理!假如你去超市買火腿腸,但超市暫時(shí)沒有,超市把你微信留下來等有了再告訴你,然后你再來,對(duì)供貨商也是一樣的做法,這樣不就合理了嗎,生產(chǎn)一部分消費(fèi)一部分。讓我們的生產(chǎn)和消費(fèi)協(xié)同起來。
總結(jié):
生產(chǎn)者和生產(chǎn)者之間:互斥
消費(fèi)者和消費(fèi)者之間:互斥
生產(chǎn)者和消費(fèi)者之間:互斥&&同步
下面我們把上面零零散散的知識(shí)做一個(gè)總結(jié)
生產(chǎn)者消費(fèi)者模型其實(shí)只需要記住記住一個(gè)原則就行了
"321"原則
3種關(guān)系:生產(chǎn)者和生產(chǎn)者(互斥),消費(fèi)者和消費(fèi)者(互斥),生產(chǎn)者和消費(fèi)者(互斥(保證共享資源的安全性)&&同步)
2種角色:生產(chǎn)者線程,消費(fèi)者線程
1個(gè)交易場(chǎng)所:一段特定結(jié)構(gòu)的緩沖區(qū)
未來只要我們想寫生產(chǎn)消費(fèi)模型,我們本質(zhì)工作其實(shí)就是維護(hù)321原則
生產(chǎn)消費(fèi)模型的特點(diǎn):
1.生產(chǎn)線程和消費(fèi)線程進(jìn)行解耦
2.支持生產(chǎn)和消費(fèi)的一段時(shí)間的忙閑不均的問題
3.提高效率
生產(chǎn)消費(fèi)提高效率,但是生產(chǎn)者和消費(fèi)者存在互斥關(guān)系??赡苌a(chǎn)者生產(chǎn)一個(gè)數(shù)據(jù)我們消費(fèi)者然后去拿,由于互斥的存在,生產(chǎn)者只能等待,同理消費(fèi)者來消費(fèi)的時(shí)候發(fā)現(xiàn)超市沒有商品,消費(fèi)者只能等生產(chǎn)者來生產(chǎn),換句話說只要我們維持嚴(yán)格的互斥關(guān)系,就退化成生產(chǎn)一個(gè)消費(fèi)一個(gè),消費(fèi)一個(gè)生產(chǎn)一個(gè),這樣能保證是高效的嗎?我們所說的高效到底體現(xiàn)到哪里?我們等下說。
到目前為止生產(chǎn)消費(fèi)原理我們告一段段落,目前線程我們有了,互斥我們也有了,接下來我們談?wù)勅绾螌?shí)現(xiàn)同步!
當(dāng)我們有了生產(chǎn)消費(fèi)的概念,有了多線程需要協(xié)同的要求,然后再學(xué)習(xí)條件變量讓我們多線程協(xié)同的技術(shù)時(shí),才更合理!
條件變量
- 當(dāng)一個(gè)線程互斥地訪問某個(gè)變量時(shí),它可能發(fā)現(xiàn)在其它線程改變狀態(tài)之前,它什么也做不了。
就好比今天在搶票,與搶票相對(duì)的就是放票,如果不放票,但是這些搶票的人也不怎么什么時(shí)候放票,其他人是不是只能瘋狂的申請(qǐng)鎖判斷票數(shù)是否大于0,大于0就搶,小于0就推出,所有人都在進(jìn)行刷票。
- 例如一個(gè)線程訪問隊(duì)列時(shí),發(fā)現(xiàn)隊(duì)列為空,它只能等待,直到其它線程將一個(gè)節(jié)點(diǎn)添加到隊(duì)列中在將該線程喚醒。這種情況就需要用到條件變量。
接下來我們見見條件變量的接口然后再舉例子理解一下
如果你想使用一個(gè)條件變量,你首先需要定義個(gè)條件變量
條件變量也是一個(gè)數(shù)據(jù)類型,是pthread庫(kù)給我們提供的
pthread_cond_t //數(shù)據(jù)類型
使用前要初始化
cond:要初始化的條件變量
attr:NULL
不用了就銷毀
如果條件變量是靜態(tài)或者全局的我們?nèi)缦鲁跏蓟?,就和互斥鎖一樣的做法。
目前我們?cè)L問臨界資源的寫法,就像搶票邏輯那樣
先加鎖
再判斷 —>生產(chǎn)和消費(fèi)條件是否滿足
最后解鎖
如果條件不滿足,就不要再申請(qǐng),而是將自己阻塞掛起!
如何掛起?條件變量給我們提供了這樣的函數(shù)
cond:要在這個(gè)條件變量上等待
mutex:互斥量,后面詳細(xì)解釋
同樣我們知道未來這個(gè)票數(shù)大于0了,我就可以去搶了,是不是也要把我叫醒啊,所以我們條件變量必然匹配了一個(gè)喚醒函數(shù)
將在指定條件變量下等待的線程盡快喚醒
喚醒一批
喚醒一個(gè)
接下來我們舉個(gè)例子理解一下條件變量
今天有很多人去面試,面試地點(diǎn)是一個(gè)公司的會(huì)議室。當(dāng)前面試官正在面試一個(gè)人,當(dāng)前能不能很多人都去這個(gè)會(huì)議室讓面試官面試呢?當(dāng)然不能,這個(gè)面試官是一份臨界資源必須要被互斥的訪問,所以一次只能有一人到房間去面試。但是這家公司組織的特別不好,當(dāng)前一個(gè)人面試完了讓下一個(gè)人去面試的時(shí)候,大家都說我先來的你先面我,這么多人在無序的情況對(duì)這份臨界資源展開競(jìng)爭(zhēng)了,這個(gè)面試官在里面面試他也不清楚,只能是根據(jù)就近原則。面試官再次面試完一個(gè)人,大家又是一窩蜂的說我先來的。可能面試管是一個(gè)臉盲,前一個(gè)人覺得自己面的不好,但是他是最近的所以他又去面試去了,導(dǎo)致整個(gè)面試就他一個(gè)人在瘋狂面試!到底面試效果并不好!
后來呢,有一個(gè)非常厲害的hr,這個(gè)hr管理能力很強(qiáng),hr立了一個(gè)牌子叫做等待區(qū),
讓所有等待面試的人都給我區(qū)等待區(qū)去排隊(duì)等,我只會(huì)按順序的從等待區(qū)叫下一個(gè)面試的人。
此時(shí)這個(gè)等待區(qū)就是我們的條件變量
換句話說未來所有應(yīng)聘者去等待面試時(shí),都必須去條件變量下去等,面試官去喚醒某個(gè)人時(shí),一定也是通過條件變量來喚醒等待的線程。
這個(gè)例子我們的理解是:當(dāng)條件不滿足的時(shí)候,我們線程必須去某些定義好的條件變量下進(jìn)行等待!
接下來我們?cè)谝砸粡垐D從內(nèi)核中理解這個(gè)條件變量
條件變量就是一個(gè)數(shù)據(jù)類型,假設(shè)它里面有一個(gè)狀態(tài)屬性,一個(gè)隊(duì)列
,我們也有很多的線程
當(dāng)線程申請(qǐng)某個(gè)資源但條件不就緒的情況下,這些線程都去隊(duì)列下排隊(duì)
當(dāng)條件滿足時(shí),就可以喚醒等待的線程,拿到CPU上開始調(diào)度
所以我們可以認(rèn)為條件變量帶一個(gè)隊(duì)列,條件不滿足時(shí),線程都去排隊(duì)等待。
所以我們剛剛學(xué)了兩個(gè)接口
下面我們先寫一個(gè)簡(jiǎn)單的案例用一用條件變量
#include <iostream>
#include <pthread.h>
#include <cstdio>
#include <string>
#include <unistd.h>
int ticket = 1000;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
using namespace std;
void *GetTicket(void *args)
{
string name = static_cast<const char *>(args);
while (true)
{
pthread_mutex_lock(&lock);
// 線程進(jìn)來都先排隊(duì)
pthread_cond_wait(&cond, &lock);//為什么要有l(wèi)ock,后面就說
if (ticket > 0)
{
usleep(1000);
cout << name << " 正在進(jìn)行搶票: " << ticket << endl;
ticket--;
pthread_mutex_unlock(&lock);
}
else
{
pthread_mutex_unlock(&lock);
break;
}
}
}
int main()
{
pthread_t t[5];
for (int i = 0; i < 5; ++i)
{
char *name = new char[64];
snprintf(name, 64, "thread %d", i + 1);
pthread_create(t+i, nullptr, GetTicket, name);
}
while (true)
{
sleep(1);
// 主線程一秒喚醒一個(gè)線程
pthread_cond_signal(&cond);
std::cout << "main thread wakeup one thread..." << std::endl;
}
for (int i = 0; i < 5; i++)
{
pthread_join(t[i], nullptr);
}
return 0;
}
加了條件變量,我們的線程就按照一定的順序進(jìn)行訪問同一份資源了。因?yàn)樗麄兌荚跅l件變量下排隊(duì)的。
while (true)
{
sleep(1);
// 一秒喚醒一個(gè)線程
//pthread_cond_signal(&cond);
//喚醒一批線程
pthread_cond_broadcast(&cond);
std::cout << "main thread wakeup one thread..." << std::endl;
}
現(xiàn)在我們左手有生產(chǎn)者消費(fèi)者模型,右手有互斥和同步,接下來我們?cè)趺窗阉鼈兘Y(jié)合在一起呢?
所以我們接下來寫一份基于BlockingQueue的生產(chǎn)者消費(fèi)者模型
3.基于BlockingQueue的生產(chǎn)者消費(fèi)者模型
BlockingQueue是一個(gè)阻塞隊(duì)列,首先它是一個(gè)隊(duì)列,既然是一個(gè)隊(duì)列就有為空的情況,同時(shí)我們對(duì)隊(duì)列設(shè)定一個(gè)上限。這時(shí)當(dāng)隊(duì)列為滿為空就要約束生產(chǎn)和消費(fèi)應(yīng)該阻塞住不應(yīng)該在生產(chǎn)和消費(fèi)了。這種我們就稱之為BlockQueue。
未來生產(chǎn)者一定是向BlockQueue里放數(shù)據(jù),此時(shí)BlockQueue就是一段特定結(jié)構(gòu)的緩沖區(qū),消費(fèi)者一定是向BlockQueu里取數(shù)據(jù)。
在多線程編程中阻塞隊(duì)列(Blocking Queue)是一種常用于實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型的數(shù)據(jù)結(jié)構(gòu)。其與普通的隊(duì)列區(qū)別在于,當(dāng)隊(duì)列為空時(shí),從隊(duì)列獲取元素的操作將會(huì)被阻塞,直到隊(duì)列中被放入了元素;當(dāng)隊(duì)列滿時(shí),往隊(duì)列里存放元素的操作也會(huì)被阻塞,直到有元素被從隊(duì)列中取出(以上的操作都是基于不同的線程來說的,線程在對(duì)阻塞隊(duì)列進(jìn)程操作時(shí)會(huì)被阻塞)
要寫生產(chǎn)者和消費(fèi)者模式必須滿足321原則,不過我們剛開始學(xué),我們先寫單生產(chǎn)者和單消費(fèi)者維護(hù)它們之間的互斥和同步關(guān)系。后面代碼寫完我們?cè)谕贫鴱V之變成多生產(chǎn)者和多消費(fèi)者。
3種關(guān)系,先寫單生成,單消費(fèi)
2種角色,生產(chǎn)者線程,消費(fèi)者線程
1個(gè)交易場(chǎng)所,BlockQueue阻塞隊(duì)列
站在編程的角度,線程A往隊(duì)列中放,線程B往隊(duì)列中拿。這個(gè)隊(duì)列就是兩個(gè)線程的共享資源。線程A放的時(shí)候線程B不能拿,線程B拿的時(shí)候線程A不能放。隊(duì)列滿的時(shí)候生產(chǎn)者線程A就不能生產(chǎn)了,要想辦法讓線程A去等待,隊(duì)列空的時(shí)候消費(fèi)者線程B也不能拿了,也要想辦法讓線程B去等待。因此我們所學(xué)互斥鎖和條件變量都是需要的。
今天我們直接用C++的queue充當(dāng)我們的阻塞隊(duì)列。
上層調(diào)用邏輯大的框架我們先寫出來
#include"BlockQueue.hpp"
void* productor(void* args)
{
BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);
while(true)
{
//生產(chǎn)活動(dòng),不斷向阻塞隊(duì)列中放
}
return nullptr;
}
void* consumer(void* args)
{
BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);
while(true)
{
//消費(fèi)活動(dòng),不斷向阻塞隊(duì)列中取
}
return nullptr;
}
int main()
{
BlockQueue<int>* bq=new BlockQueue<int>();
pthread_t p,c;
//兩個(gè)線程看到同一個(gè)阻塞隊(duì)列
pthread_create(&p,nullptr,productor,bq);
pthread_create(&c,nullptr,consumer,bq);
pthread_join(p,nullptr);
pthread_join(p,nullptr);
delete bq;
return 0;
}
阻塞隊(duì)列大的邏輯框架
#pragma onec
#include<iostream>
#include<queue>
#include<pthread.h>
using namespace std;
const int maxcapacity=5;
template<class T>
class BlockQueue
{
public:
BlockQueue(const int& _capacity=maxcapacity)
:_capacity(capacity)
{
}
//生產(chǎn)者放數(shù)據(jù)
void push(const T& in)//輸入型參數(shù),const &
{
}
//消費(fèi)者拿數(shù)據(jù)
void pop(T* out)//輸出型參數(shù),* //輸入輸出型 &
{
}
~BlockQueue()
{
}
private:
queue<T> _q;
int _capacity;//不能讓阻塞隊(duì)列無限擴(kuò)容,所以給一個(gè)最大容量表示隊(duì)列的上限
pthread_mutex_t _mutex;//阻塞隊(duì)列是一個(gè)共享資源,所以需要一把鎖把它保護(hù)起來
//生產(chǎn)者對(duì)應(yīng)的條件變量
pthread_cond_t _pcond;//隊(duì)列滿了,一定要讓生產(chǎn)者在對(duì)應(yīng)的條件變量下休眠
//消費(fèi)者對(duì)應(yīng)的條件變量
pthread_cond_t _ccond;//隊(duì)列空了,讓消費(fèi)者也在對(duì)應(yīng)條件變量下休眠
};
接下來把代碼寫完
#pragma onec
#include<iostream>
#include<queue>
#include<pthread.h>
using namespace std;
const int maxcapacity=5;
template<class T>
class BlockQueue
{
public:
BlockQueue(const int& capacity=maxcapacity)
:_capacity(capacity)
{
//構(gòu)造時(shí)初始化
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_pcond,nullptr);
pthread_cond_init(&_ccond,nullptr);
}
//生產(chǎn)者放數(shù)據(jù)
void push(const T& in)//輸入型參數(shù),const &
{
//放之前先加鎖保護(hù)共享資源,在加鎖和解鎖之間就是安全的臨界資源
pthread_mutex_lock(&_mutex);
//1.判滿
if(is_full())//bug?
{
//因?yàn)樯a(chǎn)條件不滿足,無法生產(chǎn),此時(shí)我們的生產(chǎn)者進(jìn)行等待
pthread_cond_wait(&_pcond,&_mutex);//_muext?
}
//2.走到這里一定是沒有滿
_q.push(in);
//3.絕對(duì)能保證,阻塞隊(duì)列里面一定有數(shù)據(jù)
pthread_cond_signal(&_ccond);//喚醒消費(fèi)者,這里可以有一定策略,比如說滿足三分之一在喚醒
pthread_mutex_unlock(&_mutex);
}
//消費(fèi)者拿數(shù)據(jù)
void pop(T* out)//輸出型參數(shù),* //輸入輸出型 &
{
//這里也要加鎖,因?yàn)橐WC訪問同一份資源是安全,所以用的是同一把鎖
pthread_mutex_lock(&_mutex);
//1.判空
if(is_empty())//bug?
{
pthread_cond_wait(&_ccond,&_mutex);//_mutex?
}
//2.走到這里我們能保證,一定不為空
*out=_q.front();
_q.pop();
//3.絕對(duì)能保證,阻塞隊(duì)列里面至少有一個(gè)空的位置
pthread_cond_signal(&_pcond);//這里可以有一定策略
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
//析構(gòu)時(shí)銷毀
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}
private:
bool is_full()
{
return _q.size()==_capacity;
}
bool is_empty()
{
return _q.empty();
}
private:
queue<T> _q;
int _capacity;//不能讓阻塞隊(duì)列無限擴(kuò)容,所以給一個(gè)最大容量表示隊(duì)列的上限
pthread_mutex_t _mutex;//阻塞隊(duì)列是一個(gè)共享資源,所以需要一把鎖把它保護(hù)起來
//生產(chǎn)者對(duì)應(yīng)的條件變量
pthread_cond_t _pcond;//隊(duì)列滿了,一定要讓生產(chǎn)者在對(duì)應(yīng)的條件變量下休眠
//消費(fèi)者對(duì)應(yīng)的條件變量
pthread_cond_t _ccond;//隊(duì)列空了,讓消費(fèi)者也在對(duì)應(yīng)條件變量下休眠
};
#include"BlockQueue.hpp"
#include<ctime>
#include<unistd.h>
void* productor(void* args)
{
BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);
while(true)
{
//生產(chǎn)活動(dòng)
int data=rand()%10+1;//在這里先用隨機(jī)數(shù).構(gòu)建一個(gè)數(shù)據(jù)
bq->push(data);
cout<<"生產(chǎn)數(shù)據(jù): "<<data<<endl;
sleep(1);//生產(chǎn)的慢一些
}
return nullptr;
}
void* consumer(void* args)
{
BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);
while(true)
{
//消費(fèi)活動(dòng)
int data;
bq->pop(&data);
cout<<"消費(fèi)數(shù)據(jù): "<<data<<endl;
}
return nullptr;
}
int main()
{
//隨機(jī)數(shù)種子
srand((unsigned int)time(nullptr));
BlockQueue<int>* bq=new BlockQueue<int>();
pthread_t p,c;
//兩個(gè)線程看到同一個(gè)阻塞隊(duì)列
pthread_create(&p,nullptr,productor,bq);
pthread_create(&c,nullptr,consumer,bq);
pthread_join(p,nullptr);
pthread_join(p,nullptr);
delete bq;
return 0;
}
如何證明這是一份生產(chǎn)者消費(fèi)者模型呢?
我們先讓生產(chǎn)者慢一點(diǎn)生產(chǎn),生產(chǎn)一個(gè)消費(fèi)一個(gè)
在讓消費(fèi)者慢一點(diǎn),看到生產(chǎn)一堆,而消費(fèi)者只能消費(fèi)一個(gè),不過消費(fèi)的是歷史數(shù)據(jù),消費(fèi)之后生產(chǎn)者才能繼續(xù)生產(chǎn)
void* consumer(void* args)
{
BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(args);
while(true)
{
//消費(fèi)活動(dòng)
int data;
bq->pop(&data);
cout<<"消費(fèi)數(shù)據(jù): "<<data<<endl;
sleep(1);//消費(fèi)的慢一些
}
}
代碼寫了測(cè)試也都通過了,但是這份代碼還有很多細(xì)節(jié)需要我們雕琢的地方。
以生產(chǎn)為例
細(xì)節(jié)一
首先加鎖然后最后才是解鎖,在判斷滿的生產(chǎn)條件不滿足被掛起,但是掛起的時(shí)候可是在臨界區(qū)中被掛起,如果我掛起期間還持有鎖,那其他線程也進(jìn)不來。
因此pthread_cond_wait這個(gè)函數(shù)第二個(gè)參數(shù),必須是我們正在使用的互斥鎖!
a.該函調(diào)用的時(shí)候,會(huì)以原子性的方式將鎖釋放,并將自己掛起
b.該函數(shù)在被喚醒返回的時(shí)候,會(huì)自動(dòng)的重新獲取你傳入的鎖
如果當(dāng)前醒來鎖沒有獲取成功,你也必須是處于競(jìng)爭(zhēng)鎖的狀態(tài),直到獲取鎖成功了這個(gè)函數(shù)才會(huì)返回。換言之只要這個(gè)函數(shù)返回了這個(gè)鎖一定獲取成功了。
細(xì)節(jié)二
當(dāng)前判斷生產(chǎn)條件不滿足就把自己掛起,但是這有個(gè)問題pthread_cond_wait這是一個(gè)函數(shù),只要是函數(shù)就有調(diào)用失敗的可能。
另外還存在偽喚醒的情況,假設(shè)只有一個(gè)消費(fèi)者,十個(gè)生產(chǎn)者。只消費(fèi)了一個(gè)但是卻喚醒了一批。但是你這里是if判斷,都去push肯定是有問題的。
因此充當(dāng)條件判斷的語(yǔ)法必須是while,不能用if
void push(const T& in)//輸入型參數(shù),const &
{
//放之前先加鎖保護(hù)共享資源,在加鎖和解鎖之間就是安全的臨界資源
pthread_mutex_lock(&_mutex);
//1.判滿
while(is_full())
{
//因?yàn)樯a(chǎn)條件不滿足,無法生產(chǎn),此時(shí)我們的生產(chǎn)者進(jìn)行等待
pthread_cond_wait(&_pcond,&_mutex);
}
//2.走到這里一定是沒有滿
_q.push(in);
//3.絕對(duì)能保證,阻塞隊(duì)列里面一定有數(shù)據(jù)
pthread_cond_signal(&_ccond);//喚醒消費(fèi)者,這里可以有一定策略,比如說滿足三分之一在喚醒
pthread_mutex_unlock(&_mutex);
}
//消費(fèi)者拿數(shù)據(jù)
void pop(T* out)//輸出型參數(shù),* //輸入輸出型 &
{
//這里也要加鎖,因?yàn)橐WC訪問同一份資源是安全,所以用的是同一把鎖
pthread_mutex_lock(&_mutex);
//1.判空
while(is_empty())
{
pthread_cond_wait(&_ccond,&_mutex);
}
//2.走到這里我們能保證,一定不為空
*out=_q.front();
_q.pop();
//3.絕對(duì)能保證,阻塞隊(duì)列里面至少有一個(gè)空的位置
pthread_cond_signal(&_pcond);//這里可以有一定策略
pthread_mutex_unlock(&_mutex);
}
細(xì)節(jié)三
pthread_cond_signal這個(gè)函數(shù),可以放在臨界區(qū)內(nèi)部,也可以放在外部。
也就是說這個(gè)喚醒可以放在解鎖之前也可以放在解鎖之后。但是一般建議放在里面。
因?yàn)椴魂P(guān)心誰拿到鎖,只關(guān)心有人生產(chǎn)消費(fèi)。
下面我們修改一下代碼,這樣生產(chǎn)消費(fèi)數(shù)據(jù)太low了,所以我們寫了模板可以放任意內(nèi)容。我們寫一個(gè)任務(wù)。
//Task.hpp
#pragma once
#include <iostream>
#include <functional>
#include <string>
using namespace std;
class Task
{
typedef function<int(int, int,char)> func_t;
public:
Task(){}
Task(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func)
{}
// 把任務(wù)返回去可以看到
string operator()()
{
int result = _callback(_x, _y, _op);
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result);
return buffer;
}
// 把生產(chǎn)的任務(wù)也打印出來
string toTaskString()
{
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);
return buffer;
}
private:
int _x;
int _y;
char _op; // 對(duì)應(yīng)+-*/%操作
func_t _callback; // 回調(diào)函數(shù)
};
string oper="+-*/%";
// 回調(diào)函數(shù)
int mymath(int x, int y, char op)
{
int result=0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
cout << "div zero error" << endl;
result = -1;
}
else
{
result = x / y;
}
}
break;
case '%':
{
if (y == 0)
{
cout << "mod zero error" << endl;
result = -1;
}
else
{
result = x % y;
}
}
break;
default:
break;
}
return result;
}
#include"BlockQueue.hpp"
#include<ctime>
#include<unistd.h>
#include"Task.hpp"
void* productor(void* args)
{
BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(args);
while(true)
{
//生產(chǎn)活動(dòng)
int x=rand()%10+1;
int y=rand()%5;
char op=oper[rand()%oper.size()];
Task t(x,y,op,mymath);
bq->push(t);
cout<<"生產(chǎn)任務(wù): "<<t.toTaskString()<<endl;
sleep(1);//生產(chǎn)的慢一些
}
}
void* consumer(void* args)
{
BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(args);
while(true)
{
Task t;
bq->pop(&t);
cout<<"消費(fèi)任務(wù): "<<t()<<endl;
}
}
int main()
{
//隨機(jī)數(shù)種子
srand((unsigned int)time(nullptr));
BlockQueue<Task>* bq=new BlockQueue<Task>();
pthread_t p,c;
//兩個(gè)線程看到同一個(gè)阻塞隊(duì)列
pthread_create(&p,nullptr,productor,bq);
pthread_create(&c,nullptr,consumer,bq);
pthread_join(p,nullptr);
pthread_join(p,nullptr);
delete bq;
return 0;
}
現(xiàn)在我還想把需求變一變,我讓一個(gè)線程來生產(chǎn)派發(fā)任務(wù),另一個(gè)線程來消費(fèi)處理任務(wù),再來一個(gè)線程記錄任務(wù)結(jié)果,將結(jié)果記錄在文件中!該怎么辦呢?
再來一個(gè)阻塞隊(duì)列!
#pragma once
#include <iostream>
#include <functional>
#include <string>
using namespace std;
class CallTask
{
typedef function<int(int, int, char)> func_t;
public:
CallTask() {}
CallTask(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func)
{
}
// 把任務(wù)返回去可以看到
string operator()()
{
int result = _callback(_x, _y, _op);
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
return buffer;
}
// 把生產(chǎn)的任務(wù)也打印出來
string toTaskString()
{
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
return buffer;
}
private:
int _x;
int _y;
char _op; // 對(duì)應(yīng)+-*/%操作
func_t _callback; // 回調(diào)函數(shù)
};
string oper = "+-*/%";
// 回調(diào)函數(shù)
int mymath(int x, int y, char op)
{
int result = 0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
cout << "div zero error" << endl;
result = -1;
}
else
{
result = x / y;
}
}
break;
case '%':
{
if (y == 0)
{
cout << "mod zero error" << endl;
result = -1;
}
else
{
result = x % y;
}
}
break;
default:
break;
}
return result;
}
class SaveTask
{
typedef function<void(string)> func_t;
public:
SaveTask(){}
SaveTask(string messages,func_t func):_messages(messages),_func(func)
{}
void operator()()
{
_func(_messages);
}
private:
string _messages;
func_t _func;
};
void Save(string messages)
{
string target="./log.txt";
FILE* fp=fopen(target.c_str(),"a+");
if(!fp)
{
cout<<"fopen error"<<endl;
return;
}
fprintf(fp,"%s\n",messages.c_str());
fclose(fp);
}
#include"BlockQueue.hpp"
#include<ctime>
#include<unistd.h>
#include"Task.hpp"
//C 計(jì)算
//S 存儲(chǔ)
template<class C,class S>
class BlockQueues
{
public:
BlockQueue<C>* c_bq;
BlockQueue<S>* s_bq;
};
void* productor(void* args)
{
BlockQueue<CallTask>* _c_bq=(static_cast<BlockQueues<CallTask,SaveTask>*>(args))->c_bq;
while(true)
{
//生產(chǎn)活動(dòng)
int x=rand()%10+1;
int y=rand()%5;
char op=oper[rand()%oper.size()];
CallTask t(x,y,op,mymath);
_c_bq->push(t);
cout<<"productor thread, 生產(chǎn)計(jì)算任務(wù): "<<t.toTaskString()<<endl;
sleep(1);//生產(chǎn)的慢一些
}
}
void* consumer(void* args)
{
BlockQueue<CallTask>* _c_bq=(static_cast<BlockQueues<CallTask,SaveTask>*>(args))->c_bq;
BlockQueue<SaveTask>* _s_bq=(static_cast<BlockQueues<CallTask,SaveTask>*>(args))->s_bq;
while(true)
{
CallTask t;
_c_bq->pop(&t);
cout<< "cal thread, 完成計(jì)算任務(wù): "<<t()<<endl;
SaveTask s(t(),Save);
_s_bq->push(s);
cout<< "cal thread, 推送存儲(chǔ)任務(wù)完成..." <<t()<<endl;
//sleep(1);//消費(fèi)的慢一些
}
}
void* saver(void* args)
{
BlockQueue<SaveTask>* _s_bq=(static_cast<BlockQueues<CallTask,SaveTask>*>(args))->s_bq;
while(true)
{
SaveTask s;
_s_bq->pop(&s);
s();
cout<<"save thread, 保存任務(wù)完成..."<<endl;
}
}
int main()
{
//隨機(jī)數(shù)種子
srand((unsigned int)time(nullptr));
//寫一個(gè)類把兩個(gè)阻塞隊(duì)列都傳過去
BlockQueues<CallTask,SaveTask>* bq=new BlockQueues<CallTask,SaveTask>();
bq->c_bq=new BlockQueue<CallTask>();
bq->s_bq=new BlockQueue<SaveTask>();
pthread_t p,c,s;
pthread_create(&p,nullptr,productor,bq);
pthread_create(&c,nullptr,consumer,bq);
pthread_create(&s,nullptr,saver,bq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
pthread_join(s,nullptr);
delete bq->c_bq;
delete bq->s_bq;
return 0;
}
到目前為止,我們寫的都是一個(gè)生產(chǎn)者一個(gè)消費(fèi)者,我想寫多生產(chǎn)者多消費(fèi)者直接把我們的代碼改成多個(gè)生產(chǎn)者多個(gè)消費(fèi)者可以嗎?
#include"BlockQueue.hpp"
#include<ctime>
#include<unistd.h>
#include"Task.hpp"
void* productor(void* args)
{
BlockQueue<CallTask>* _c_bq=static_cast<BlockQueue<CallTask>*>(args);
while(true)
{
//生產(chǎn)活動(dòng)
int x=rand()%10+1;
int y=rand()%5;
char op=oper[rand()%oper.size()];
CallTask t(x,y,op,mymath);
_c_bq->push(t);
cout<<"productor thread, 生產(chǎn)計(jì)算任務(wù): "<<t.toTaskString()<<endl;
sleep(1);//生產(chǎn)的慢一些
}
}
void* consumer(void* args)
{
BlockQueue<CallTask>* _c_bq=static_cast<BlockQueue<CallTask>*>(args);
while(true)
{
//消費(fèi)活動(dòng)
CallTask t;
_c_bq->pop(&t);
cout<< "cal thread, 完成計(jì)算任務(wù): "<<t()<<endl;
}
}
int main()
{
//隨機(jī)數(shù)種子
srand((unsigned int)time(nullptr));
BlockQueue<CallTask>* bq=new BlockQueue<CallTask>();
pthread_t p[3],c[2];
for(int i=0;i<3;++i)
{
pthread_create(p+i,nullptr,productor,bq);
}
for(int i=0;i<2;++i)
{
pthread_create(c+i,nullptr,consumer,bq);
}
for(int i=0;i<3;++i)
{
pthread_join(p[i],nullptr);
}
for(int i=0;i<2;++i)
{
pthread_join(c[i],nullptr);
}
return 0;
}
當(dāng)然可以,不管是多個(gè)生產(chǎn)者還是消費(fèi)者進(jìn)入生產(chǎn)和消費(fèi)的代碼之前都必須先加鎖,都要先去競(jìng)爭(zhēng)鎖,只有持有鎖的才能進(jìn)入阻塞隊(duì)列當(dāng)中,阻塞隊(duì)列中同時(shí)永遠(yuǎn)只有一個(gè)線程再跑!
接下來我們的問題就變成了,在一個(gè)阻塞隊(duì)列中有多個(gè)生產(chǎn)者,有多個(gè)消費(fèi)者,但是因?yàn)殒i的存在,所以任何一個(gè)時(shí)刻只有一個(gè)執(zhí)行流在隊(duì)列中放或者拿。
1.那你創(chuàng)建多線程生產(chǎn)和消費(fèi)的意義是什么??
2.生產(chǎn)消費(fèi)模型高效在哪里?
對(duì)于生產(chǎn)者它構(gòu)建任務(wù)從那來的?
今天我們寫的代碼下x,y是為了測(cè)試隨便寫的,可是未來這個(gè)x,y大概率是從數(shù)據(jù)庫(kù)、網(wǎng)絡(luò)、外設(shè)拿來的用戶數(shù)據(jù),然后構(gòu)建任務(wù)push到任務(wù)隊(duì)列中,生產(chǎn)者線程構(gòu)建任務(wù)是要花費(fèi)時(shí)間的!!
對(duì)于消費(fèi)者線程來說,把任務(wù)pop從任務(wù)隊(duì)列中拿出來,今天我們的加減乘除很簡(jiǎn)單,有沒有可能未來后序我們處理任務(wù)非常耗時(shí)??!
把任務(wù)拿出來多個(gè)消費(fèi)者線程的高效體現(xiàn)在一個(gè)線程從任務(wù)隊(duì)列中拿出任務(wù)之后做計(jì)算,那另一個(gè)線程也可以拿出然后繼續(xù)在計(jì)算,然后再另一個(gè)線程拿在計(jì)算,所以高效并不是體現(xiàn)在從隊(duì)列中拿數(shù)據(jù)高效,而是讓一個(gè)或多個(gè)線程可以同時(shí)并發(fā)的計(jì)算多個(gè)任務(wù),在計(jì)算的同時(shí)還不影響其他線程從任務(wù)隊(duì)列中拿任務(wù)的過程。
同樣的生產(chǎn)者線程獲取任務(wù)的成本很高,你這個(gè)線程獲取成功了你可以把任務(wù)放到任務(wù)隊(duì)列中,但你這個(gè)線程在放任務(wù)的同時(shí),其他線程可以同步的并發(fā)的從外部中拿任務(wù)。
所以生產(chǎn)者消費(fèi)者模型高效在哪里,答案是生產(chǎn)者消費(fèi)者模式并不高效在隊(duì)列中拿放,而是在生產(chǎn)之前和消費(fèi)之后,讓線程并行執(zhí)行??!
同樣生產(chǎn)者消費(fèi)者的意義也不再隊(duì)列中,而是在放之前同時(shí)生產(chǎn),拿之后同時(shí)消費(fèi)。
接下來我們找找看我們的代碼有沒有不足的地方
1.一個(gè)線程,在操作臨界資源的時(shí)候,必須臨界資源是滿足條件的!
2.可是,公共資源是否滿足生產(chǎn)或者消費(fèi)條件,我們無法直接得知(我們不能事先得知(在沒有訪問之前無法得知))
3.只能先加鎖,再檢測(cè),再操作,再解鎖
為什么要先加鎖呢?因?yàn)槟阋獧z測(cè)的本質(zhì)也是在訪問臨界資源!
總而言之就是,因?yàn)槲覀冊(cè)诓僮髋R界資源的時(shí)候,有可能不就緒,但是我們無法提前得知,所以只能先加鎖,在檢測(cè),根據(jù)檢測(cè)結(jié)果,決定下一步怎么走!這是我們剛才寫代碼的邏輯。文章來源:http://www.zghlxwxcb.cn/news/detail-778600.html
那有沒有一種方法在實(shí)際操作的時(shí)候就把對(duì)應(yīng)的資源情況得知呢?
下一篇文章細(xì)說!文章來源地址http://www.zghlxwxcb.cn/news/detail-778600.html
到了這里,關(guān)于【linux】線程同步+基于BlockingQueue的生產(chǎn)者消費(fèi)者模型的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!