目錄
一.為何要使用生產(chǎn)者消費者模型
?二.生產(chǎn)者消費者模型優(yōu)點
?三.基于BlockingQueue的生產(chǎn)者消費者模型
1.BlockingQueue——阻塞隊列
2.實現(xiàn)代碼
?四.POSIX信號量
五.基于環(huán)形隊列的生產(chǎn)消費模型
一.為何要使用生產(chǎn)者消費者模型
生產(chǎn)者消費者模式就是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。這個阻塞隊列就是用來給生產(chǎn)者和消費者解耦的。
?
?二.生產(chǎn)者消費者模型優(yōu)點
- 解耦:生產(chǎn)者和消費者不直接解除,無需關(guān)心對方的情況,僅僅自己與緩沖區(qū)解除。
- 支持并發(fā):并發(fā)的體現(xiàn)并不在于多個消費者同時從緩沖區(qū)中拿數(shù)據(jù),也不是多個生產(chǎn)者同時從緩沖區(qū)放數(shù)據(jù),而是消費者在處理拿到的數(shù)據(jù)的時候,生產(chǎn)者可以繼續(xù)向緩沖區(qū)放數(shù)據(jù)。
- 支持忙閑不均 :當(dāng)生產(chǎn)者生產(chǎn)過快的時候,可以讓生產(chǎn)者慢下來,當(dāng)消費者消費過快的時候,可以讓消費者慢下來。
?三.基于BlockingQueue的生產(chǎn)者消費者模型
?1.BlockingQueue——阻塞隊列
在多線程編程中阻塞隊列(Blocking Queue)是一種常用于實現(xiàn)生產(chǎn)者和消費者模型的數(shù)據(jù)結(jié)構(gòu)。其與普通的隊列區(qū)別在于,當(dāng)隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中被放入了元素;當(dāng)隊列滿時,往隊列里存放元素的操作也會被阻塞,直到有元素被從隊列中取出(以上的操作都是基于不同的線程來說的,線程在對阻塞隊列進程操作時會被阻塞)。
2.實現(xiàn)代碼
#include <iostream>
#include <string>
#include <queue>
#include <ctime>
#include <unistd.h>
#include <pthread.h>
using namespace std;
template <class T>
class BlockQueue
{
public:
BlockQueue(size_t cap)
: _cap(cap)
{
// 初始化條件變量
pthread_cond_init(&_c_cond, nullptr);
pthread_cond_init(&_p_cond, nullptr);
}
void push(T date)
{
// 將任務(wù)push進去隊列,多線程加鎖,每一只能一個線程push任務(wù)
pthread_mutex_lock(&_mutex);
while (_q.size() == _cap) // 如果隊列已經(jīng)滿了,生產(chǎn)者要被阻塞
{
pthread_cond_wait(&_p_cond, &_mutex);
}
_q.push(date);
// 當(dāng)push任務(wù)成功的時候,需要將喚醒消費者來處理數(shù)據(jù)
pthread_cond_signal(&_c_cond);
pthread_mutex_unlock(&_mutex);
}
T pop()
{
// 將任務(wù)從隊列中拿出來,多線程加鎖,每一只能一個線程拿任務(wù)
pthread_mutex_lock(&_mutex);
// 如果隊列是空的就將消費者阻塞
while (isempty())
{
pthread_cond_wait(&_c_cond, &_mutex);
}
T tmp = _q.front();
_q.pop();
// 成功拿到數(shù)據(jù)以后,喚醒生產(chǎn)者繼續(xù)生產(chǎn)任務(wù)
pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_mutex);
return tmp;
}
~BlockQueue()
{
pthread_cond_destroy(&_c_cond);
pthread_cond_destroy(&_p_cond);
}
private:
bool isempty()
{
return _q.empty();
}
bool isfull()
{
return _q.size() == _cap;
}
private:
queue<T> _q; // 隊列
size_t _cap; // 容量
pthread_cond_t _c_cond; // 消費者條件變量
pthread_cond_t _p_cond; // 生產(chǎn)者條件變量
pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER; // 互斥鎖
};
cp模型:
#include "BlockQueue.hpp"
using namespace std;
// 構(gòu)建任務(wù)
struct Task
{
Task(int a, int b, char op)
: _a(a), _b(b), _op(op)
{
}
char _op; // 運算符
int _a; // 運算數(shù)1
int _b; // 運算數(shù)2
int ret; // 結(jié)果
int _exitcode; // 退出碼
};
void *push_task(void *args)
{
BlockQueue<Task> *pBQ = static_cast<BlockQueue<Task> *>(args);
while (1)
{
char op_arr[4] = {'+', '-', '*', '/'};
int a = rand() % 10;
int b = rand() % 10;
char op = op_arr[(a * b) % 4];
// 構(gòu)建任務(wù)結(jié)構(gòu)體
Task tk(a, b, op);
// push任務(wù)
pBQ->push(tk);
printf("%d %c %d =?\n", a, op, b);
sleep(1);
}
return NULL;
}
void *get_task(void *args)
{
BlockQueue<Task> *pBQ = static_cast<BlockQueue<Task> *>(args);
while (1)
{
// 獲取任務(wù)并處理
Task tk = pBQ->pop();
switch (tk._op)
{
case '+':
tk.ret = tk._a + tk._b;
break;
case '-':
tk.ret = tk._a - tk._b;
break;
case '*':
tk.ret = tk._a * tk._b;
break;
case '/':
if (tk._b == 0)
{
exit(-1);
}
tk.ret = tk._a / tk._b;
break;
default:
break;
}
printf("%d %c %d = %d\n", tk._a, tk._op, tk._b, tk.ret);
sleep(1);
}
return NULL;
}
int main()
{
BlockQueue<Task> BQ(5);
pthread_t tid_c[4];
pthread_t tid_p[4];
srand(time(nullptr));
// push
pthread_create(&tid_c[0], NULL, push_task, &BQ);
pthread_create(&tid_c[1], NULL, push_task, &BQ);
pthread_create(&tid_c[2], NULL, push_task, &BQ);
pthread_create(&tid_c[3], NULL, push_task, &BQ);
// get
pthread_create(&tid_p[0], NULL, get_task, &BQ);
pthread_create(&tid_p[1], NULL, get_task, &BQ);
pthread_create(&tid_p[2], NULL, get_task, &BQ);
pthread_create(&tid_p[3], NULL, get_task, &BQ);
pthread_join(tid_c[0], NULL);
pthread_join(tid_c[1], NULL);
pthread_join(tid_c[2], NULL);
pthread_join(tid_c[3], NULL);
pthread_join(tid_p[0], NULL);
pthread_join(tid_p[1], NULL);
pthread_join(tid_p[2], NULL);
pthread_join(tid_p[3], NULL);
return 0;
}
測試結(jié)果:
?四.POSIX信號量
POSIX信號量和SystemV信號量作用相同,都是用于同步操作,達到無沖突的訪問共享資源目的。 但POSIX可以用于線程間同步。
定義信號量:
sem_t sem;
初始化信號量:
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
參數(shù):
- pshared:0表示線程間共享,非零表示進程間共享。
- value:信號量初始值。
銷毀信號量:
int sem_destroy(sem_t *sem);
等待信號量:
功能:等待信號量,會將信號量的值減1。
int sem_wait(sem_t *sem); //P()
發(fā)布信號量:
功能:發(fā)布信號量,表示資源使用完畢,可以歸還資源了。將信號量值加1。
int sem_post(sem_t *sem);//V()
說明:
- 信號量本身就是一個計數(shù)器,用來描述可用資源的數(shù)目。
- 信號量機制就像是我們看電影買票一樣,是對資源的預(yù)定機制。
- 只有申請到信號量才能對共享資源訪問。
- 只要我們申請信號量成功了,將來我們一定可以訪問臨界資源,就像看電影,我們只要買到了電影票,不管我們?nèi)ゲ蝗ル娪霸海娪霸豪镆欢ㄓ形覀兊奈恢谩?/li>
五.基于環(huán)形隊列的生產(chǎn)消費模型
環(huán)形隊列采用數(shù)組模擬,用模運算來模擬環(huán)狀特性。
環(huán)形結(jié)構(gòu)起始狀態(tài)和結(jié)束狀態(tài)都是一樣的,不好判斷為空或者為滿,所以可以通過加計數(shù)器或者標(biāo)記位來判斷滿或者空。另外也可以預(yù)留一個空的位置,作為滿的狀態(tài)。
代碼:
RingQueue.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <ctime>
#include <cstdlib>
#include <pthread.h>
#include <vector>
#include <unistd.h>
#include <semaphore.h>
#include "mutex.hpp"
#include "Task.hpp"
using namespace std;
const size_t size = 5;
template <class T>
class RingQueue
{
void P(sem_t &sem) // 申請信號量
{
sem_wait(&sem);
}
void V(sem_t &sem) // 釋放信號量
{
sem_post(&sem);
}
public:
RingQueue(int cap = size)
: _cap(cap), _index_space(0), _index_date(0)
{
// 初始化信號量
sem_init(&_sem_date, 0, 0); // 數(shù)據(jù)信號量初始化為0
sem_init(&_sem_space, 0, cap); // 空間信號量初始化為容量大小
// 初始化鎖
pthread_mutex_init(&_mutex, nullptr);
_rq.resize(_cap);
}
void push(const T date)
{
// 申請空間信號量
P(_sem_space);
{
MutexGuard lock(&_mutex);
_rq[_index_date++] = date;
_index_date %= _cap;
}
// 釋放數(shù)據(jù)信號量
V(_sem_date);
}
T pop()
{
// 申請數(shù)據(jù)信號量
P(_sem_date);
T tmp;
{
MutexGuard lock(&_mutex);
tmp = _rq[_index_space++];
_index_space %= _cap;
}
// 釋放空間信號量
V(_sem_space);
return tmp;
}
~RingQueue()
{
// 釋放信號量和互斥鎖
sem_destroy(&_sem_date);
sem_destroy(&_sem_space);
pthread_mutex_destroy(&_mutex);
}
private:
vector<T> _rq;
size_t _cap; // 容量
sem_t _sem_space; // 記錄環(huán)形隊列的空間信號量
sem_t _sem_date; // 記錄環(huán)形隊列的數(shù)據(jù)信號量
size_t _index_space; // 生產(chǎn)者的生產(chǎn)位置
size_t _index_date; // 消費者的消費位置
pthread_mutex_t _mutex; // 容量
};
mutex.hpp:
class Mutex
{
public:
Mutex(pthread_mutex_t *mutex)
: _mutex(mutex)
{
}
void lock()
{
pthread_mutex_lock(_mutex);
}
void unlock()
{
pthread_mutex_unlock(_mutex);
}
~Mutex()
{
}
private:
pthread_mutex_t *_mutex;
};
class MutexGuard
{
public:
MutexGuard(pthread_mutex_t *mutex)
: _mutex(mutex)
{
_mutex.lock();
}
~MutexGuard()
{
_mutex.unlock();
}
private:
Mutex _mutex;
};
Task.hpp:文章來源:http://www.zghlxwxcb.cn/news/detail-713361.html
#include <iostream>
#include <cstdio>
#include <ctime>
#include <cstdlib>
struct Task
{
Task(int a = 1, int b = 1, char op = '+')
: _a(a), _b(b), _op(op)
{
}
void run()
{
switch (_op)
{
case '+':
_ret = _a + _b;
break;
case '-':
_ret = _a - _b;
break;
case '*':
_ret = _a * _b;
break;
case '/':
if (_b == 0)
{
_exitcode = -1;
exit(1);
}
_ret = _a / _b;
break;
default:
break;
}
}
void showtask()
{
printf("producer:%d %c %d = ?\n", _a, _op, _b);
}
void showresult()
{
printf("consumer:%d %c %d = %d(exitcode:%d)\n", _a, _op, _b, _ret, _exitcode);
}
~Task() {}
private:
int _a;
int _b;
char _op;
int _ret;
int _exitcode = 0;
};
pthread.cc:文章來源地址http://www.zghlxwxcb.cn/news/detail-713361.html
#include "RingQueue.hpp"
void *run_p(void *args)
{
char ops[4] = {'+', '-', '*', '/'};
RingQueue<Task> *RQ = static_cast<RingQueue<Task> *>(args);
while (1)
{
int a = rand() % 100;
int b = rand() % 100;
int op = ops[(a * b) % 4];
Task tk(a, b, op);
RQ->push(tk);
tk.showtask();
sleep(1);
}
}
void *run_c(void *args)
{
RingQueue<Task> *RQ = static_cast<RingQueue<Task> *>(args);
while (1)
{
Task tk = RQ->pop();
tk.run();
tk.showresult();
sleep(1);
}
}
int main()
{
RingQueue<Task> *RQ = new RingQueue<Task>(5);
srand(time(0));
pthread_t tid_c[5];
pthread_t tid_p[5];
for (int i = 0; i < 5; i++)
{
pthread_create(&tid_c[i], nullptr, run_c, RQ);
pthread_create(&tid_p[i], nullptr, run_p, RQ);
}
for (int i = 0; i < 5; i++)
{
pthread_join(tid_c[i], nullptr);
pthread_join(tid_p[i], nullptr);
}
delete RQ;
return 0;
}
到了這里,關(guān)于Linux——生產(chǎn)者消費者模型的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!