喜歡的點贊,收藏,關注一下把!
1.POSIX信號量
上篇文章最后我們基于BlockQueue生產(chǎn)者消費者模型寫了代碼,測試什么的都通過了。最后我們說代碼還有一些不足的地方,由這些不足從而引入了我們接下來要學的信號量!
我們在看一看不足的地方
1.一個線程,在操作臨界資源的時候,必須臨界資源是滿足條件的!
2.可是,公共資源是否滿足生產(chǎn)或者消費條件,我們無法直接得知(我們不能事先得知(在沒有訪問之前無法得知))
3.只能先加鎖,再檢測,再操作,再解鎖
為什么要先加鎖呢?因為你要檢測的本質(zhì)也是在訪問臨界資源!
總而言之就是,因為我們在操作臨界資源的時候,有可能不就緒,但是我們無法提前得知,所以只能先加鎖,在檢測,根據(jù)檢測結果,決定下一步怎么走!這是我們剛才寫代碼的邏輯。
那有沒有一種方法在實際操作的時候就把對應的資源情況得知呢?
有的,信號量!
關于信號量我們以前學過,今天我們真正的用起來。
首先回答什么是信號量
再說信號量之前在說一個東西
只要我們對資源進行整體加鎖,就默認了我們對這個資源整體使用。這點要注意。
但是實際情況可能存在,一份公共資源,但是允許同時訪問不同的區(qū)域!
就好比我們之前在信號量說過的電影院的例子,電影院在我們現(xiàn)實生活中是由我們共享的,它的工作模式是可以同時讓我們?nèi)プ阶约旱淖簧?,真正的競爭關系只在于訪問同一個座位時存在沖突。
下面說說什么是信號量
a.信號量本質(zhì)是一把計數(shù)器
之前說過電影院的例子是這樣說的,每一個想進入電影院看電影的時候,是不是只有你屁股坐到電影院某個座位上這個座位才屬于你,是這樣的嗎?并不是,只要我先買票,票買到了雖然人還沒有坐到電影院的某個座位上,即使很長時間沒有過去,但是我心里很清楚,只要我票買了這個座位就會給我預留著。所以我們對應申請信號量的動作,起始就相當于購買電影票的概念。也就是說未來想訪問這一份公共資源或者公共資源的某一區(qū)域,所有線程都必須遵守先申請信號量!只有申請成功的線程,就一定能保證在這份公共資源里一定有一個位置給你預留著讓你去訪問。那一定有一個位置給你預留讓你去訪問怎么保證呢?這是由程序員編碼保證不同的線程可以并發(fā)訪問公共資源的不同區(qū)域!
什么意思呢?就是說我對一份公共資源進行劃分,比如是數(shù)組,進來之前先申請信號量,申請成功的線程,程序員通過編碼保證讓不同的線程能夠訪問公共資源的不同區(qū)域。
因為信號量是一把計數(shù)器,它是衡量臨界資源中資源數(shù)量多少的計數(shù)器
所以比如說電影院有100個座位,每一個人進入電影票之前你得先買票,而票最多100張,不管最后100個人最后怎么坐,至少我保證進入電影院的人不會超過100個,所以對100個人經(jīng)過合理的規(guī)劃讓他們坐到不同的座位上,不就不會出現(xiàn)沖突嗎,并且還會并發(fā)訪問這個資源。
所以在真正訪問臨界資源前,先申請信號量這是第一點。
b.只要擁有信號量,就在未來一定能夠擁有臨界資源的一部分,所以申請信號量的本質(zhì):對臨界資源中特定小塊資源的 預定機制
這就對應去電影院買票看電影,你買到票了但你根本就沒有去看電影,你很清楚只要我把電影院票買到了,我在未來想看的時候一定有這個座位給我留著。所以買票的本質(zhì)是對座位的預定機制。
所以未來訪問公共資源需要先申請信號量。
先申請信號量,信號量本質(zhì)是一把計數(shù)器,這意味著什么?
是不是通過這兩點,有可能我們在訪問真正的臨界之前,我們其實就可以提前知道臨界資源的使用情況?。?!
基于這一點就有可能解決以前那種先加鎖在檢測臨界資源,而現(xiàn)在不用給我檢測臨界資源了,你給我檢測信號量就行了,在檢測信號量期間這一份公共資源不會被訪問。
只要申請成功,就一定有你的資源!
只要申請失敗,就說明條件不就緒,你只能等!
所以我們就可以將信號量的申請成功或失敗,直接或間接判斷資源就緒的情況,所以我們就不再判斷了。
總結一些,其實就是我們發(fā)現(xiàn)之前的代碼根本原因就是你根本就不知道這個臨界資源中資源的使用情況所以你只能自己先加鎖再檢測,能生產(chǎn)就生產(chǎn)不能就把自己掛起,但今天每一個線程都必須先申請信號量,信號量是一把計數(shù)器是衡量臨界資源數(shù)量多少的計數(shù)器,它也是對資源的預定機制,所以你先不要著急訪問臨界資源也不要著急判斷,你直接去申請信號量,只要你申請成功這個資源就一定是就緒的你直接生產(chǎn)就行了,申請失敗那對不起這個資源并不滿足你只能等待,而不管成功還是失敗你都沒有去訪問臨界資源,所以通過這個的方式把公共資源的數(shù)目先暴露出來,來確定資源有沒有就緒。這就是信號量!
什么是信號量:信號量的本質(zhì)是一把計數(shù)器,它是用來對臨界資源某一塊資源的預定機制。
為什么要有信號量:在訪問臨界資源之前通過信號量的方式得知資源的使用情況。
按照我們這種說法
線程要訪問臨界資源中的某一區(qū)域 ---- 先申請信號量 ---- 所有人必須要先看到信號量 ---- 信號量本身必須是:公共資源
信號量是一把計數(shù)器,那它匹配的一定有 遞增 or 遞減
偽代碼,假設sem_tsem=10;
sem- -(信號量- -的操作) ----> 申請資源 ----> 必須保證操作的原子性 ---->
P操作
sem++(信號量++的操作) ----> 歸還資源 ----> 必須保證操作的原子性 ---->
V操作
信號量核心操作:PV原語
接下來看看信號量的基本使用接口(怎么辦?)
初始化信號量
sem_t //信號量類型,這是由pthead庫為我們維護的信號量
sem:定義的信號量
pshared:0表示線程間共享,非零表示進程間共享
value:該信號量計數(shù)器的初始值,這個值完全取決于臨界資源數(shù)目的多少
銷毀信號量
等待信號量
對信號量計數(shù)器減一,就是信號量對應匹配的P操作
發(fā)布信號量
表示資源使用完畢,可以歸還資源了。將信號量值加1。對應信號量匹配的V操作
2.基于環(huán)形隊列的生產(chǎn)消費模型
目前關于信號量我們學了50%,下面我們找一個場景寫代碼來幫我們更深層次的理解它。
在數(shù)據(jù)結構初階我們學過環(huán)形隊列,今天這里我們就不在關注它是如何實現(xiàn)的,而關注點在于基于環(huán)形隊列的生產(chǎn)消費模型!
我們也知道環(huán)形隊列在物理上是一個數(shù)組,可以用數(shù)組來實現(xiàn)。
關于環(huán)形隊列就回顧到這里。
那環(huán)形隊列在生產(chǎn)者消費者這里我們怎么用它。
單生產(chǎn)單消費為例
既然是一個環(huán)形隊列,未來一定有個生產(chǎn)者在放數(shù)據(jù),有個消費者在拿數(shù)據(jù)。最開始它們倆一定指向同一個位置。
對于生產(chǎn)者和消費者而言,它們倆在什么情況也會訪問同一個位置呢?
- 空的時候
- 滿的時候
- 其他情況,生產(chǎn)者和消費者,根本訪問的就是不同的區(qū)域!
下面我們舉個例子
今天我們想玩一個游戲,一張桌子,桌子有N個空的盤子
然后你和我手拉手到了這張桌子的旁邊,我們走到這個桌子時,桌子上N個空盤子,
我們倆玩的是你追我跑的游戲,我不斷的向盤子里放一個蘋果,放一個之后立馬就往后跑。你要直接把蘋果從盤子上拿走然后你覺得不好吃,然后你繼續(xù)追我。盤子里不能放兩個或兩個以上的蘋果,你也不能從空盤子里假裝拿到了蘋果,我們在完追逐游戲的時候,怎么樣保證游戲正常的運行呢?我們有如下幾個原則!
- 你不能超過我
- 我不能把你套一個圈以上
- 我們倆什么時候,會站在一起?
a . 盤子全為空
b. 盤子上全都是蘋果(滿)
c.其他情況,我們倆指向的是不同的位置!
盤子全為空我們倆站在一起讓誰先運行呢?
我(生產(chǎn)者)
盤子全都是蘋果我們倆又站在一起了這次讓誰先運行呢?
你(消費者)
根據(jù)上面的例子我們有了一個結論:
在環(huán)形隊列中,大部分情況,單生產(chǎn)者和單消費是可以并發(fā)執(zhí)行的!
只有在滿或者空的時候,才會有互斥與同步的問題?。?/strong>
對應到我們剛剛的說的環(huán)形隊列中,桌子以及一個個盤子就是我們的環(huán)形隊列,我叫做生產(chǎn)者線程,你叫做消費者線程。生產(chǎn)者和消費者只有在空滿的時候訪問同一個位置,其他都訪問的是不同區(qū)域,所以當然可以并發(fā)了。
所以為了完成環(huán)形隊列生產(chǎn)消費問題,我們要做的核心工作是什么?
你—>消費者
我—>生產(chǎn)者
- 你不能超過我
- 我不能把你套一個圈以上
- 我們倆什么時候,會站在一起?
那如何保證上面的性質(zhì)呢?
就用我們剛學的信號量!
信號量是用來衡量臨界資源中資源數(shù)量的!
以前我們從來沒有談論過資源的問題。即使在阻塞隊列中我們所說的資源就是這個隊列,我們是把隊列當做整體來使用的。因為我們加了鎖所以我們認為隊列是資源。但你在想想資源這個東西是不是各花入各眼。
1.對于生產(chǎn)者而言,看中的是什么?
隊列中剩余的空間
2.對于消費者而言,看中的是什么?
放入隊列中的數(shù)據(jù)
所以為了更好衡量生產(chǎn)者和消費者,我們給空間資源定義一個信號量,給數(shù)據(jù)資源定義一個信號量。
有了這個信號量空間資源有多少剩余多少,數(shù)據(jù)資源有多少剩余多少,計數(shù)器給你確定,要生成和消費都先申請信號量,空間信號量只要申請成功環(huán)形隊列一定有個位置給你生產(chǎn),同樣資源信號量只要申請成功環(huán)形隊列一定有個數(shù)據(jù)給你消費。所以去訪問就行了,至于訪問那一個由我們程序員寫代碼來確定。
接下來寫一些偽代碼幫助理解
生產(chǎn)者你想進入環(huán)形隊列,對不起先不要著急,你先進行P操作,申請信號量。
申請成功,你就可以繼續(xù)向下運行。
申請失敗,當前執(zhí)行流,阻塞在申請?zhí)?br>
申請成功并且把數(shù)據(jù)放到隊列中之后,就要進行V操作,這里有一個問題生產(chǎn)者把數(shù)據(jù)生產(chǎn)到隊列中,它要走了,那么它曾經(jīng)申請的信號量在它生產(chǎn)結束之后歸還了嗎?答案是并沒有!因為把數(shù)據(jù)生產(chǎn)到隊列中你走了但你生產(chǎn)者生產(chǎn)的數(shù)據(jù)依舊在你曾經(jīng)申請的資源空格里面放著,你人走了數(shù)據(jù)還在,數(shù)據(jù)還在格子就依舊被占用。 可是我還是做一下V操作啊,雖然我走了但是畢竟環(huán)形隊列中多了一個數(shù)據(jù),多了一個數(shù)據(jù)不就是多了一個消費者看中的資源嗎,所以此時V不應該V的prodoctor_sem的信號量而是consumer_sem的信號量!
此時完成了一個生產(chǎn)過程。
對于消費者而言你首先也要進行P操作,申請信號量資源,你要申請的是消費者看中的數(shù)據(jù)資源。申請成功往下走從事消費活動。申請失敗阻塞同樣阻塞在申請?zhí)帯?br>
從事消費活動之后也要進行V操作,當消費者把隊列中數(shù)據(jù)消費了走了數(shù)據(jù)已經(jīng)不在了,那消費者申請的信號量數(shù)據(jù)有還嗎?并沒有!但是消費者把隊列中數(shù)據(jù)拿走了,隊列中的位置不就空出來一個嗎。這意味著消費者消費一個數(shù)據(jù)那可以供我們生產(chǎn)的位置資源不就多了一個嗎。所以消費者V的是prodoctor_sem
最開始環(huán)形隊列是空的,生產(chǎn)者線程和消費者線程并發(fā)誰先運行呢?
答案是誰先運行不確定,但一定是誰申請成功誰往下運行!
當隊列為空的是,一定是生產(chǎn)者申請成功繼續(xù)往下執(zhí)行代碼。因為消費者最開始資源計數(shù)器為0只能是申請失敗阻塞等待!當生產(chǎn)者執(zhí)行完V操作之后,消費者consumer_sem一定由0->1。那消費者P操作一定成功繼續(xù)往后運行。
所以當隊列為空的時候,一定能保證只有一個執(zhí)行流進來 ,一定只能保證生產(chǎn)者先運行!
假設極端場景,消費者就不消費,那么生產(chǎn)者能不能把消費者套一個圈,繞過消費者在繼續(xù)生產(chǎn)呢?
不可以!因為信號量是一個計數(shù)器,生產(chǎn)者連續(xù)消費prodoctor_sem一直在減減,雖然consumer_sem一直在加加但是先不消費,一直生產(chǎn)生產(chǎn)最后prodoctor_sem減到0 ,它還想在生產(chǎn)還能申請到信號量嗎?并不能!所以即便是消費者不消費,生產(chǎn)者隨便生產(chǎn),對不起你生產(chǎn)滿了你在申請空間資源的時候你申請不到了,你就會阻塞掛起!所以生產(chǎn)者不能把消費者套一個圈
假設現(xiàn)在已經(jīng)生產(chǎn)滿了,
此時生產(chǎn)線程和消費線程誰先運行我們也不確定,但是生產(chǎn)一定先阻塞在申請?zhí)?,消費申請資源成功繼續(xù)往下運行。
所以滿的時候生產(chǎn)和消費同時到來,生產(chǎn)一定申請不成功,進不到臨界資源去訪問。消費一定先運行!
生產(chǎn)者一直不生產(chǎn),那有沒有可能消費者把數(shù)據(jù)消費完了最終超過生產(chǎn)者呢?
并不會,消費完數(shù)據(jù)了已經(jīng)沒有數(shù)據(jù)可以消費了。消費者再去申請時只能阻塞掛起了!只能等生產(chǎn)者生產(chǎn)。所以消費者不能超過生產(chǎn)者
只有空滿的時候我們倆會在同一個位置,你不能超過我,我不能把你套一個圈以上,除此之外生產(chǎn)者和消費者都是并發(fā)執(zhí)行訪問不同位置!
這些都是由信號量計數(shù)器來保證的!
未來,生產(chǎn)和消費的位置我們要想清楚。
1.其實就是隊列中的下標
2.一定是兩個下標
3.為空或者為滿,下標相同
通過下標讓生產(chǎn)和消費訪問不同的位置,另外通過下標給我們的線程指派要訪問的資源。
接下來我們寫代碼實現(xiàn)環(huán)形隊列生產(chǎn)消費模型
上層調(diào)用邏輯大的框架
#include"ringqueue.hpp"
#include<ctime>
#include<sys/types.h>
#include<unistd.h>
void* productor(void* args)
{
ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);
while(true)
{
}
}
void* consumer(void* args)
{
ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);
while(true)
{
}
}
int main()
{
//隨機數(shù)種子,這里為了更隨機
srand((unsigned int)time(nullptr)^getpid());
ringqueue<int>* rq=new ringqueue<int>();
pthread_t p,c;
pthread_create(&p,nullptr,productor,rq);
pthread_create(&c,nullptr,consumer,rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
return 0;
}
環(huán)形隊列大的邏輯框架
#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<semaphore.h>
using namespace std;
const int maxcapacity=5;
template<class T>
class ringqueue
{
public:
ringqueue(int capacity=maxcapacity):_queue(capacity),_capacity(capacity)
{
//空間資源初始化為環(huán)形隊列大小
sem_init(&_spacesem,0,_capacity);
//數(shù)據(jù)資源初始化為0
sem_init(&_datasem,0,0);
_pstep=_cstep=0;
}
//生產(chǎn)
void push(const T& in)
{
}
//消費
void pop(T* out)
{
}
~ringqueue()
{
//銷毀
sem_destroy(&_spacesem);
sem_destroy(&_datasem);
}
private:
vector<T> _queue;//模擬環(huán)形隊列
int _capacity;//隊列的大小,不能無線擴容
sem_t _spacesem;//生產(chǎn)者生產(chǎn)看中的空間資源(信號量)
sem_t _datasem;//消費者消費看中的數(shù)據(jù)資源(信號量)
int _pstep;//生產(chǎn)者下標
int _cstep;//消費者下標
};
接下來我們把代碼補充完整
#include"ringqueue.hpp"
#include<ctime>
#include<sys/types.h>
#include<unistd.h>
void* productor(void* args)
{
ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);
while(true)
{
//生產(chǎn)活動
//version1
int data=rand()&10+1;
rq->push(data);
cout<<"生產(chǎn)完成,生產(chǎn)的數(shù)據(jù)是: "<<data<<endl;
}
}
void* consumer(void* args)
{
ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);
while(true)
{
//消費活動
//version1
int data;
rq->pop(&data);
cout<<"消費完成,消費的數(shù)據(jù)是: "<<data<<endl;
}
}
int main()
{
//隨機數(shù)種子,這里為了更隨機
srand((unsigned int)time(nullptr)^getpid());
ringqueue<int>* rq=new ringqueue<int>();
pthread_t p,c;
pthread_create(&p,nullptr,productor,rq);
pthread_create(&c,nullptr,consumer,rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
return 0;
}
現(xiàn)在有個問題未來生產(chǎn)者生產(chǎn)滿了還能不能讓生產(chǎn)者生產(chǎn)嗎?
不能,消費者把數(shù)據(jù)消費完了也不能在消費了。所以理論上它倆也有同步的過程,滿了不能再生產(chǎn)空了不能在消費。這樣是不是阻塞隊列很像啊,但是它和阻塞隊列不同的點是,如果生產(chǎn)和消費不是同一個位置,它們是可以同時生產(chǎn)和消費的。
#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<semaphore.h>
#include<cassert>
using namespace std;
const int maxcapacity=5;
template<class T>
class ringqueue
{
public:
ringqueue(int capacity=maxcapacity):_queue(capacity),_capacity(capacity)
{
//空間資源初始化為環(huán)形隊列大小
int n=sem_init(&_spacesem,0,_capacity);
assert(n == 0);
//數(shù)據(jù)資源初始化為0
n=sem_init(&_datasem,0,0);
assert(n == 0);
_pstep=_cstep=0;
}
void push(const T& in)
{
//1.申請空間資源P操作, 成功意味著我一定能進行正常的生產(chǎn),失敗阻塞掛起
P(_spacesem);
//2.往對應生產(chǎn)下標處生產(chǎn)
_queue[_pstep++]=in;
_pstep%=_capacity;//為了是一個環(huán)形的
//3.環(huán)形隊列多了一個消費資源
V(_datasem);
}
void pop(T* out)
{
P(_datasem);//申請成功,意味著一定能進行正常的消費
*out=_queue[_cstep++];//從對應的消費下標處消費
_cstep%=_capacity;
V(_spacesem);//環(huán)形隊列多了一個空間資源
}
~ringqueue()
{
//銷毀
sem_destroy(&_spacesem);
sem_destroy(&_datasem);
}
private:
void P(sem_t& sem)//對信號量做--
{
int n=sem_wait(&sem);
assert(n == 0);
(void)n;
}
void V(sem_t& sem)//對信號量做++
{
int n=sem_post(&sem);
assert(n == 0);
(void)n;
}
private:
vector<T> _queue;//模擬環(huán)形隊列
int _capacity;//隊列的大小,不能無線擴容
sem_t _spacesem;//生產(chǎn)者生產(chǎn)看中的空間資源(信號量)
sem_t _datasem;//消費者消費看中的數(shù)據(jù)資源(信號量)
int _pstep;//生產(chǎn)者下標
int _cstep;//消費者下標
};
未來生產(chǎn)和消費誰先運行不知道,但是在初始時我們把空間資源信號量置為最大值,數(shù)據(jù)資源信號量置為0。最開始生產(chǎn)者和消費者同時到來 ,對不起只有生產(chǎn)者P操作成功,消費者P操作失敗所以消費者只能等。所以為空只能是生產(chǎn)者先生產(chǎn)。而滿的時候,只能是消費者消費生產(chǎn)者只能等。
void* productor(void* args)
{
ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);
while(true)
{
//生產(chǎn)活動
//version1
int data=rand()&10+1;
rq->push(data);
cout<<"生產(chǎn)完成,生產(chǎn)的數(shù)據(jù)是: "<<data<<endl;
sleep(1);//生產(chǎn)慢一點
}
}
void* consumer(void* args)
{
ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);
while(true)
{
//消費活動
//version1
int data;
rq->pop(&data);
cout<<"消費完成,消費的數(shù)據(jù)是: "<<data<<endl;
sleep(1);//消費慢一點
}
環(huán)形隊列生產(chǎn)和消費的過程一定是可以并行的,因為只有當空和滿的時候才會只有一個執(zhí)行流。不過我們這里看不出來。
下面我們再把在阻塞隊列寫的任務拿過去,讓環(huán)形隊列跑看看效果
#pragma once
#include <iostream>
#include <functional>
#include <string>
using namespace std;
class Task
{
typedef function<int(int, int, char)> func_t;
public:
Task() {}
Task(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func)
{
}
// 把任務返回去可以看到
string operator()()
{
int result = _callback(_x, _y, _op);
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
return buffer;
}
// 把生產(chǎn)的任務也打印出來
string toTaskString()
{
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
return buffer;
}
private:
int _x;
int _y;
char _op; // 對應+-*/%操作
func_t _callback; // 回調(diào)函數(shù)
};
string oper = "+-*/%";
// 回調(diào)函數(shù)
int mymath(int x, int y, char op)
{
int result = 0;
switch (op)
{
case '+':
result = x + y;
break;
case '-':
result = x - y;
break;
case '*':
result = x * y;
break;
case '/':
{
if (y == 0)
{
cout << "div zero error" << endl;
result = -1;
}
else
{
result = x / y;
}
}
break;
case '%':
{
if (y == 0)
{
cout << "mod zero error" << endl;
result = -1;
}
else
{
result = x % y;
}
}
break;
default:
break;
}
return result;
}
#include"ringqueue.hpp"
#include<ctime>
#include<sys/types.h>
#include<unistd.h>
#include"Task.hpp"
void* productor(void* args)
{
//ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);
ringqueue<Task>* rq=static_cast<ringqueue<Task>*>(args);
while(true)
{
//生產(chǎn)活動
//version1
// int data=rand()&10+1;
// rq->push(data);
// cout<<"生產(chǎn)完成,生產(chǎn)的數(shù)據(jù)是: "<<data<<endl;
//sleep(1);//生產(chǎn)慢一點
//version2
//構建or獲取任務
int x=rand()%10+1;
int y=rand()%5;
char op=oper[rand()%oper.size()];
Task t(x,y,op,mymath);
//生產(chǎn)任務
rq->push(t);
//輸出提示
cout<<"生產(chǎn)者派發(fā)了一個任務: "<<t.toTaskString()<<endl;
sleep(1);
}
}
void* consumer(void* args)
{
//ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);
ringqueue<Task>* rq=static_cast<ringqueue<Task>*>(args);
while(true)
{
//消費活動
//version1
// int data;
// rq->pop(&data);
// cout<<"消費完成,消費的數(shù)據(jù)是: "<<data<<endl;
// sleep(1);//消費慢一點
//vecrsion2
Task t;
//消費任務
rq->pop(&t);
cout<<"消費者消費了一個任務: "<<t()<<endl;
}
}
int main()
{
//隨機數(shù)種子,這里為了更隨機
srand((unsigned int)time(nullptr)^getpid());
ringqueue<Task>* rq=new ringqueue<Task>();
pthread_t p,c;
pthread_create(&p,nullptr,productor,rq);
pthread_create(&c,nullptr,consumer,rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
return 0;
}
目前我們寫的代碼都是單生成單消費,那多生產(chǎn)多消費該怎么寫呢?
根據(jù)321原則,生產(chǎn)與生產(chǎn)互斥關系,消費與消費互斥關系,生產(chǎn)與消費互斥同步,目前只維護了生產(chǎn)和消費的互斥與同步關系。那生產(chǎn)與生產(chǎn),消費與消費之前關系該如何維護呢?不然就寫不出多生產(chǎn)多消費!
在以前我們的阻塞隊列中無論是多個生產(chǎn)者還是多少消費者無論在任何時刻只有一個線程在阻塞隊列中訪問臨界資源。今天我們是環(huán)形隊列的生產(chǎn)消費模型,生產(chǎn)生產(chǎn)消費消費也是互斥關系,所以無論無何在任何時刻是不是只允許一個生產(chǎn)者或者一個消費者先進入到臨界區(qū)里面,就是讓生產(chǎn)者之間先競爭選出一個勝利者,消費者之間競爭選出一個勝利者,然后生產(chǎn)者和消費者勝利的這個人在結合隊列是不是空的慢的,能不能并發(fā),然后確定如何生產(chǎn)消費。
所以說在環(huán)形隊列中多生產(chǎn)多消費,只要保證,最終進入臨界區(qū)的一個是生產(chǎn),一個是消費就行了。
所以我們需要加鎖,并且是兩把鎖,一把生產(chǎn)者競爭用的鎖,一把消費者競爭用的鎖。你們內(nèi)部之間先競爭處一個勝出者然后在進行臨界資源的訪問。
#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<semaphore.h>
#include<cassert>
using namespace std;
const int maxcapacity=5;
template<class T>
class ringqueue
{
public:
ringqueue(int capacity=maxcapacity):_queue(capacity),_capacity(capacity)
{
//空間資源初始化為環(huán)形隊列大小
int n=sem_init(&_spacesem,0,_capacity);
assert(n == 0);
//數(shù)據(jù)資源初始化為0
n=sem_init(&_datasem,0,0);
assert(n == 0);
pthread_mutex_init(&_plock,nullptr);
pthread_mutex_init(&_clock,nullptr);
_pstep=_cstep=0;
}
void push(const T& in)
{
pthread_mutex_lock(&_plock);
//1.申請空間資源P操作, 成功意味著我一定能進行正常的生產(chǎn),失敗阻塞掛起
P(_spacesem);
//2.往對應生產(chǎn)下標處生產(chǎn)
_queue[_pstep++]=in;
_pstep%=_capacity;//為了是一個環(huán)形的
//3.環(huán)形隊列多了一個消費資源
V(_datasem);
pthread_mutex_unlock(&_plock);
}
void pop(T* out)
{
pthread_mutex_lock(&_clock);
P(_datasem);//申請成功,意味著一定能進行正常的消費
*out=_queue[_cstep++];//從對應的消費下標處消費
_cstep%=_capacity;
V(_spacesem);//環(huán)形隊列多了一個空間資源
pthread_mutex_unlock(&_clock);
}
~ringqueue()
{
//銷毀
sem_destroy(&_spacesem);
sem_destroy(&_datasem);
pthread_mutex_destroy(&_plock);
pthread_mutex_destroy(&_clock);
}
private:
void P(sem_t& sem)//對信號量做--
{
int n=sem_wait(&sem);
assert(n == 0);
(void)n;
}
void V(sem_t& sem)//對信號量做++
{
int n=sem_post(&sem);
assert(n == 0);
(void)n;
}
private:
vector<T> _queue;//模擬環(huán)形隊列
int _capacity;//隊列的大小,不能無線擴容
sem_t _spacesem;//生產(chǎn)者生產(chǎn)看中的空間資源(信號量)
sem_t _datasem;//消費者消費看中的數(shù)據(jù)資源(信號量)
int _pstep;//生產(chǎn)者下標
int _cstep;//消費者下標
pthread_mutex_t _plock;//生產(chǎn)者的鎖
pthread_mutex_t _clock;//消費者的鎖
};
我們先測試一下再說
#include"ringqueue.hpp"
#include<ctime>
#include<sys/types.h>
#include<unistd.h>
#include"Task.hpp"
string Selfname()
{
char name[64];
snprintf(name,sizeof name,"thread[0x%x]",pthread_self());
return name;
}
void* productor(void* args)
{
//ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);
ringqueue<Task>* rq=static_cast<ringqueue<Task>*>(args);
while(true)
{
//生產(chǎn)活動
//version1
// int data=rand()&10+1;
// rq->push(data);
// cout<<"生產(chǎn)完成,生產(chǎn)的數(shù)據(jù)是: "<<data<<endl;
//sleep(1);//生產(chǎn)慢一點
//version2
//構建or獲取任務
int x=rand()%10+1;
int y=rand()%5;
char op=oper[rand()%oper.size()];
Task t(x,y,op,mymath);
//生產(chǎn)任務
rq->push(t);
//輸出提示
//cout<<"生產(chǎn)者派發(fā)了一個任務: "<<t.toTaskString()<<endl;
cout<<Selfname()<<" ,生產(chǎn)者派發(fā)了一個任務: "<<t.toTaskString()<<endl;
sleep(1);
}
}
void* consumer(void* args)
{
//ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);
ringqueue<Task>* rq=static_cast<ringqueue<Task>*>(args);
while(true)
{
//消費活動
//version1
// int data;
// rq->pop(&data);
// cout<<"消費完成,消費的數(shù)據(jù)是: "<<data<<endl;
// sleep(1);//消費慢一點
//vecrsion2
Task t;
//消費任務
rq->pop(&t);
//cout<<"消費者消費了一個任務: "<<t()<<endl;
cout<<Selfname()<<" ,消費者消費了一個任務: "<<t()<<endl;
}
}
int main()
{
//隨機數(shù)種子,這里為了更隨機
srand((unsigned int)time(nullptr)^getpid());
//ringqueue<int>* rq=new ringqueue<int>();
ringqueue<Task>* rq=new ringqueue<Task>();
pthread_t p[4],c[2];
for(int i=0;i<4;++i)
{
pthread_create(p+i,nullptr,productor,rq);
}
for(int i=0;i<2;++i)
{
pthread_create(c+i,nullptr,consumer,rq);
}
for(int i=0;i<4;++i)
{
pthread_join(p[i],nullptr);
}
for(int i=0;i<2;++i)
{
pthread_join(c[i],nullptr);
}
return 0;
}
效果雖然不明顯,但是確實是多生產(chǎn)多消費。
但是我們的加鎖邏輯有一些問題,這個加鎖有沒有優(yōu)化的可能
你認為:先加鎖,后申請信號量,還是先申請信號量,在加鎖?
我們維護的互斥關系是為了保護信號量嗎?信號量大手一揮瞥了瞥眼同情的看這這把鎖,你保護我干什么,我的申請本來就是原子的,我申請又不會出問題你保護我干啥呢?
我們是先申請鎖然后在申請信號量等到走完之后才釋放了鎖,而另一個生產(chǎn)者也只能是先申請鎖在申請信號量,那么在你持有鎖之間我可不可以先把信號量指派給不同的線程呢?
這種先申請鎖在申請信號量一定比較低,因為先申請鎖意味著后序的線程,只能在鎖上等,當持有鎖線程把鎖釋放了,后序線程才能去申請信號量。
因此正確寫法如下
void push(const T& in)
{
//1.申請空間資源P操作, 成功意味著我一定能進行正常的生產(chǎn),失敗阻塞掛起
P(_spacesem);
pthread_mutex_lock(&_plock);
//2.往對應生產(chǎn)下標處生產(chǎn)
_queue[_pstep++]=in;
_pstep%=_capacity;//為了是一個環(huán)形的
//3.環(huán)形隊列多了一個消費資源
pthread_mutex_unlock(&_plock);
V(_datasem);
}
void pop(T* out)
{
P(_datasem);//申請成功,意味著一定能進行正常的消費
pthread_mutex_lock(&_clock);
*out=_queue[_cstep++];//從對應的消費下標處消費
_cstep%=_capacity;
pthread_mutex_unlock(&_clock);
V(_spacesem);//環(huán)形隊列多了一個空間資源
}
這樣反過來,就相當于讓每個線程先去領取任務,領到任務成功之后進去就一個個進。但是在你一個線程在里面做訪問的時候,我可以在外面不斷的派發(fā)任務,這樣加鎖就是一個線程進去的時候其他線程不是干等,還有信號量嗎我拿一個,還有信號量嗎我拿一個。因為是串行的,這樣進去是一個一個進去的并且訪問的是不同的下標。
問:
多線程訪問環(huán)形隊列時,進入隊列最多幾個線程?最少幾個線程?
最多有兩個線程,生產(chǎn)者和消費者同時并發(fā)的訪問。
最少只有一個線程,隊列為空為滿的時候。
但無論什么時候,最多只有一個生產(chǎn)者一個消費者進入這個隊列中。
問:多生產(chǎn)多消費的效率高在哪里?(阻塞隊列說過)
獲取or構建任務這個是要花時間,并不像我們寫的那樣隨便來個隨機數(shù)就行了,將來可能會從外設,網(wǎng)絡,文件,數(shù)據(jù)庫里讀取數(shù)據(jù)的。
消費的時候也是要花時間的,并不是把任務從環(huán)形隊列中拿到就完了,后面可能大量的時間在處理這個任務。
所以多生產(chǎn)多效果高效在生產(chǎn)之前消費之后可以并發(fā)執(zhí)行獲取任務和處理任務!文章來源:http://www.zghlxwxcb.cn/news/detail-790069.html
至此我們的環(huán)形隊列結束了。下一篇我們根據(jù)在線程這塊所學的知識寫一個線程池!文章來源地址http://www.zghlxwxcb.cn/news/detail-790069.html
到了這里,關于【linux】POSIX信號量+基于環(huán)形隊列的生產(chǎn)消費模型的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!