目錄
一、信號(hào)量
1、概念
2、信號(hào)量操作函數(shù)
二、基于環(huán)形隊(duì)列的生產(chǎn)者消費(fèi)者模型
1、模型分析
2、代碼實(shí)現(xiàn)
1、單生產(chǎn)單消費(fèi)的生產(chǎn)者消費(fèi)者模型
2、多生產(chǎn)多消費(fèi)的生產(chǎn)者消費(fèi)者模型
一、信號(hào)量
1、概念
引入:前面我們講到了,對臨界資源進(jìn)行訪問時(shí),為了保證數(shù)據(jù)的一致性,我們需要對臨界資源進(jìn)行加鎖保護(hù)。當(dāng)我們用一個(gè)互斥鎖對臨界資源進(jìn)行保護(hù)時(shí),相當(dāng)于我們將這塊臨界資源看作一個(gè)整體,同一時(shí)刻只允許一個(gè)執(zhí)行流對這塊臨界資源進(jìn)行訪問。這樣做非常合理,但是效率太低了。
但是,我們最理想的方案,其實(shí)是:如果在同一時(shí)刻,不同的執(zhí)行流要訪問的是臨界資源的不同區(qū)域,那么我們是允許它們同時(shí)進(jìn)行臨界資源的訪問,這樣就大大提高了效率。
比如,如果一個(gè)臨界資源是一個(gè)大小為10數(shù)組,我們可以對其加鎖保護(hù)??墒?,現(xiàn)在來看,如果有10個(gè)線程同時(shí)訪問這個(gè)數(shù)組,可以嗎?當(dāng)然可以,只要這10個(gè)線程在同一時(shí)刻訪問的是數(shù)組的不同位置,即10個(gè)位置一個(gè)線程訪問一個(gè)。這樣我們就可以讓多個(gè)執(zhí)行流同時(shí)訪問臨界資源了。
這時(shí),我們就使用了信號(hào)量來幫助我們實(shí)現(xiàn)這個(gè)方案。
信號(hào)量:信號(hào)量的本質(zhì)是一個(gè)計(jì)數(shù)器,通常用來表示臨界資源中,資源數(shù)的多少。申請信號(hào)量實(shí)際上就是對臨界資源的預(yù)定機(jī)制。信號(hào)量主要用于同步和互斥。
每個(gè)執(zhí)行流在進(jìn)入臨界區(qū)之前都應(yīng)該先申請信號(hào)量,申請成功就有了訪問臨界資源的權(quán)限,當(dāng)訪問完畢后就應(yīng)該釋放信號(hào)量。?
信號(hào)量的PV操作:
~? P操作:我們將申請信號(hào)量稱為P操作。申請信號(hào)量的本質(zhì)就是申請獲得臨界資源中某塊資源的訪問權(quán)限,當(dāng)申請成功時(shí)臨界資源中資源的數(shù)目應(yīng)該減一,因此P操作的本質(zhì)就是讓信號(hào)量減一。
~? V操作:我們將釋放信號(hào)量稱為V操作。釋放信號(hào)量的本質(zhì)就是歸還臨界資源中某塊資源的訪問權(quán)限,當(dāng)釋放成功時(shí)臨界資源中資源的數(shù)目就應(yīng)該加一,因此V操作的本質(zhì)就是讓信號(hào)量加一。
2、信號(hào)量操作函數(shù)
sem_init:初始化信號(hào)量的函數(shù),該函數(shù)的函數(shù)原型如下:返回值,初始化信號(hào)量成功返回0,失敗返回-1。
NAME
sem_init - initialize an unnamed semaphore
SYNOPSIS
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
Link with -pthread.
參數(shù)說明:
sem:需要初始化的信號(hào)量。
pshared:傳入0值表示線程間共享,傳入非零值表示進(jìn)程間共享。
value:信號(hào)量的初始值。
sem_destroy:銷毀信號(hào)量的函數(shù),該函數(shù)的函數(shù)原型如下:
NAME
sem_destroy - destroy an unnamed semaphore
SYNOPSIS
#include <semaphore.h>
int sem_destroy(sem_t *sem);
Link with -pthread.
參數(shù)說明:sem:需要銷毀的信號(hào)量。
返回值:銷毀信號(hào)量成功返回0,失敗返回-1。
sem_wait:等待信號(hào)量(申請信號(hào)量)的函數(shù),該函數(shù)的函數(shù)原型如下:
NAME
sem_wait, sem_timedwait, sem_trywait - lock a semaphore
SYNOPSIS
#include <semaphore.h>
int sem_wait(sem_t *sem);
Link with -pthread.
參數(shù):sem:需要等待的信號(hào)量。
返回值:等待信號(hào)量成功返回0,信號(hào)量的值減一。等待信號(hào)量失敗返回-1,信號(hào)量的值保持不變。
sem_post:釋放信號(hào)量(發(fā)布信號(hào)量)的函數(shù),該函數(shù)的函數(shù)原型如下:
NAME
sem_post - unlock a semaphore
SYNOPSIS
#include <semaphore.h>
int sem_post(sem_t *sem);
Link with -pthread.
參數(shù):sem:需要釋放的信號(hào)量。
返回值:釋放信號(hào)量成功返回0,信號(hào)量的值加一。釋放信號(hào)量失敗返回-1,信號(hào)量的值保持不變。
有了對信號(hào)量的各種操作,我們下面來通過一個(gè)具體的例子來使用一下他們。
二、基于環(huán)形隊(duì)列的生產(chǎn)者消費(fèi)者模型
1、模型分析
實(shí)際上并不是真正的環(huán)形隊(duì)列,因?yàn)槲覀儧]有這種數(shù)據(jù)結(jié)構(gòu),它的實(shí)現(xiàn)是通過數(shù)組模擬的,當(dāng)數(shù)據(jù)加入到最后的位置時(shí)直接模等于數(shù)組的大小即可,這樣就可以回到數(shù)組的起始位置。
我們在對環(huán)形隊(duì)列進(jìn)行訪問時(shí),當(dāng)隊(duì)列為空或者為滿,生產(chǎn)者和消費(fèi)者就會(huì)指向同一個(gè)位置,這時(shí)我們就需要生產(chǎn)者和消費(fèi)者互斥和同步了,如果為空,讓生產(chǎn)者先訪問,為滿就讓消費(fèi)者先訪問。
而當(dāng)隊(duì)列不為空,也不為滿時(shí),生產(chǎn)者和消費(fèi)者可以訪問隊(duì)列不同的位置,以實(shí)現(xiàn)并發(fā)。這樣就可以提高效率了。
為了實(shí)現(xiàn)消費(fèi)者和生產(chǎn)者的并發(fā)訪問,我們需要使用信號(hào)量。我們使用信號(hào)量來表示隊(duì)列中相關(guān)資源的個(gè)數(shù)。
1、對于生產(chǎn)者,在意的是隊(duì)列中的空間資源,只要有空間生產(chǎn)者就可以進(jìn)行生產(chǎn)。空間資源定義成一個(gè)生產(chǎn)者需要的信號(hào)量(space_sem),在初始化時(shí),它的初始值我們應(yīng)該設(shè)置為環(huán)形隊(duì)列的容量,因?yàn)閯傞_始時(shí)環(huán)形隊(duì)列當(dāng)中全是空間。
2、對于消費(fèi)者,在意的是隊(duì)列中的數(shù)據(jù)資源,只要有數(shù)據(jù)消費(fèi)者就可以進(jìn)行消費(fèi)。數(shù)據(jù)資源定義成一個(gè)消費(fèi)者需要的信號(hào)量(data_sem),在初始化時(shí),它的初始值我們應(yīng)該設(shè)置為0,因?yàn)閯傞_始時(shí)環(huán)形隊(duì)列當(dāng)中沒有數(shù)據(jù)。
當(dāng)生產(chǎn)者線程生產(chǎn)數(shù)據(jù)時(shí),它需要先申請信號(hào)量,即對space_sem進(jìn)行P操作,然后生產(chǎn)數(shù)據(jù),放入到隊(duì)列中。生產(chǎn)完成后,這時(shí),隊(duì)列中就多出了一個(gè)數(shù)據(jù)資源,需要對data_sem進(jìn)行V操作。
當(dāng)消費(fèi)者線程消費(fèi)數(shù)據(jù)時(shí),它也需要先申請信號(hào)量,即對data_sem進(jìn)行P操作,然后消費(fèi)數(shù)據(jù)。消費(fèi)完成后,隊(duì)列中就會(huì)多出一個(gè)空間資源,需要對space_sem進(jìn)行V操作。
2、代碼實(shí)現(xiàn)
對信號(hào)量進(jìn)行封裝:sem.hpp
#pragma once
#include <iostream>
#include <semaphore.h>
using namespace std;
class sem
{
public:
sem(int sem)
{
sem_init(&sem_, 0, sem);
}
void p()
{
sem_wait(&sem_);
}
void v()
{
sem_post(&sem_);
}
~sem()
{
sem_destroy(&sem_);
}
private:
sem_t sem_;
};
1、單生產(chǎn)單消費(fèi)的生產(chǎn)者消費(fèi)者模型
RingQueue.hpp:環(huán)形隊(duì)列的實(shí)現(xiàn):
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
#include <sys/types.h>
#include <unistd.h>
using namespace std;
#define GVAL 5
template <class T>
class RingQueue
{
public:
RingQueue(const int num = GVAL)
: num_(num), rq_(num), space_sem(num), data_sem(0), p_step(0), c_step(0)
{
}
void push(const T &in)
{
space_sem.p(); // 申請信號(hào)量
rq_[p_step++] = in;
p_step %= num_;
data_sem.v(); // 釋放信號(hào)量
}
void pop(T *out)
{
data_sem.p(); // 申請信號(hào)量
*out = rq_[c_step++];
c_step %= num_;
space_sem.v(); // 釋放信號(hào)量
}
~RingQueue()
{
}
private:
vector<T> rq_;
int num_; // 大小
sem space_sem; // 空間資源信號(hào)量
sem data_sem; // 數(shù)據(jù)資源信號(hào)量
int c_step; // 消費(fèi)者下標(biāo)
int p_step; // 生產(chǎn)者下標(biāo)
};
單生產(chǎn)單消費(fèi)的生產(chǎn)者消費(fèi)者模型:
#include <iostream>
#include <pthread.h>
#include "RingQueue.hpp"
using namespace std;
void *consumer(void *arg)
{
RingQueue<int> *rq = (RingQueue<int> *)arg;
while(true)
{
int x;
rq->pop(&x);
cout << "消費(fèi):" << x << endl;
sleep(1);
}
}
void *productor(void *arg)
{
RingQueue<int> *rq = (RingQueue<int> *)arg;
while(true)
{
int x = rand() % 100 + 1;
rq->push(x);
cout << "生產(chǎn):" << x << endl;
}
}
int main()
{
srand((uint64_t)time(nullptr) ^ getpid() ^ 2321);
RingQueue<int> *rq = new RingQueue<int>();
pthread_t c, p;
pthread_create(&c, nullptr, consumer, (void *)rq);
pthread_create(&p, nullptr, productor, (void *)rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
運(yùn)行結(jié)果:
2、多生產(chǎn)多消費(fèi)的生產(chǎn)者消費(fèi)者模型
我們只要保證,最終進(jìn)入臨界區(qū)的是一個(gè)生產(chǎn)者,一個(gè)消費(fèi)就行,即生產(chǎn)者和生產(chǎn)者之間是互斥的,消費(fèi)者和消費(fèi)者之間是互斥的。所以我們需要提供兩把鎖。
RingQueue.hpp:
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
#include <sys/types.h>
#include <unistd.h>
using namespace std;
#define GVAL 5
template <class T>
class RingQueue
{
public:
RingQueue(const int num = GVAL)
: num_(num), rq_(num), space_sem(num), data_sem(0), p_step(0), c_step(0)
{
pthread_mutex_init(&clock, nullptr);
pthread_mutex_init(&plock, nullptr);
}
void push(const T &in)
{
space_sem.p(); // 申請信號(hào)量
pthread_mutex_lock(&plock);
rq_[p_step++] = in;
p_step %= num_;
pthread_mutex_unlock(&plock);
data_sem.v(); // 釋放信號(hào)量
}
void pop(T *out)
{
data_sem.p(); // 申請信號(hào)量
pthread_mutex_lock(&clock);
*out = rq_[c_step++];
c_step %= num_;
pthread_mutex_unlock(&clock);
space_sem.v(); // 釋放信號(hào)量
}
~RingQueue()
{
pthread_mutex_destroy(&clock);
pthread_mutex_destroy(&plock);
}
private:
vector<T> rq_;
int num_; // 大小
sem space_sem; // 空間資源信號(hào)量
sem data_sem; // 數(shù)據(jù)資源信號(hào)量
int c_step; // 消費(fèi)者下標(biāo)
int p_step; // 生產(chǎn)者下標(biāo)
pthread_mutex_t plock;
pthread_mutex_t clock;
};
#include <iostream>
#include <pthread.h>
#include "RingQueue.hpp"
using namespace std;
void *consumer(void *arg)
{
RingQueue<int> *rq = (RingQueue<int> *)arg;
while (true)
{
int x;
rq->pop(&x);
cout << "消費(fèi):" << x << " " << pthread_self() << endl;
sleep(1);
}
}
void *productor(void *arg)
{
RingQueue<int> *rq = (RingQueue<int> *)arg;
while (true)
{
int x = rand() % 100 + 1;
rq->push(x);
cout << "生產(chǎn):" << x << " " << pthread_self() << endl;
}
}
int main()
{
srand((uint64_t)time(nullptr) ^ getpid() ^ 2321);
RingQueue<int> *rq = new RingQueue<int>();
pthread_t c[2], p[3];
for (int i = 0; i < 2; i++)
pthread_create(c + i, nullptr, consumer, (void *)rq);
for (int i = 0; i < 3; i++)
pthread_create(p + i, nullptr, productor, (void *)rq);
for (int i = 0; i < 2; i++)
pthread_join(c[i], nullptr);
for (int i = 0; i < 3; i++)
pthread_join(p[i], nullptr);
return 0;
}
運(yùn)行結(jié)果:文章來源:http://www.zghlxwxcb.cn/news/detail-854347.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-854347.html
到了這里,關(guān)于Linux之信號(hào)量 | 消費(fèi)者生產(chǎn)者模型的循環(huán)隊(duì)列的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!