目錄
一、生產者消費者模型
二、代碼實現模型
2.1 BlockQueue.hpp
2.2 MainCP.cc
2.3 執(zhí)行結果
三、效率優(yōu)勢
一、生產者消費者模型
將上述圖片邏輯轉換成代碼邏輯就是,一批線程充當生產者角色,一批線程充當消費者角色,倉庫是生產者和消費者獲取的公共資源!下面我想用321原則來解釋這個模型。
既然是公共資源,那么我們就需要考慮線程安全問題!
3指的是三者之間的關系!當一個生產者向倉庫生產資源的時候,消費者不可以進入倉庫取資源!同樣,當一個生產者向倉庫放入資源的時候,其他生產者不能同時也向同一個位置放入資源,也就是說生產者要等上一個生產者走出倉庫后進入倉庫!同樣消費者與消費者也是這樣的關系!也就是說生產者與生產者互斥,生產者與消費者互斥,消費者與消費者互斥!
當消費者與生產者完成自己的操作后,會提醒對方前來進行對方的操作!也就是說,生產者與消費者同步!
2指的是2中角色:生產者和消費者
1指的是存儲資源的容器(倉庫):一段特定結構的緩沖區(qū)(隊列、棧、鏈表)
二、代碼實現模型
2.1 BlockQueue.hpp
#pragma once
#include<iostream>
#include<pthread.h>
#include<queue>
template <class T>
class BlockQueue
{
public:
static const int gmaxcap = 5;
BlockQueue(const int maxcap = gmaxcap):_capacity(maxcap)
{
//初始化
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_pcond,nullptr);
pthread_cond_init(&_ccond,nullptr);
}
void push(const T& in)
{
//加鎖互斥
pthread_mutex_lock(&_mutex);
//細節(jié):為什么這里用while循環(huán)判斷?
//-->有一種可能性,當生產者很多而消費者只有一個,第一次條件滿足進入阻塞后,消費者處理了
// 一個任務
//這樣就會同時喚醒一批生產者,如果只是if,他們不會再次判斷而是直接“同時!”進行后面push的
//邏輯!這樣不是線程安全的
while(is_full())
{
pthread_cond_wait(&_pcond,&_mutex);
}
//進行到這表示一定有空余空間存數據
_q.push(in);
//喚醒消費者
pthread_cond_signal(&_ccond);
pthread_mutex_unlock(&_mutex);
}
void pop(T* out)
{
pthread_mutex_lock(&_mutex);
//和上面邏輯一樣
while(is_empty())
{
pthread_cond_wait(&_ccond,&_mutex);
}
//到這表示一定有數據
*out = _q.front();
_q.pop();
//喚醒生產者
pthread_cond_signal(&_pcond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}
private:
bool is_empty()
{
return _q.empty();
}
bool is_full()
{
return _q.size() == _capacity;
}
private:
std::queue<T> _q; //存儲任務隊列
int _capacity;
pthread_mutex_t _mutex; //一把鎖 -> 3互斥原則
pthread_cond_t _pcond; //生產者條件變量
pthread_cond_t _ccond; //消費者條件變量
};
2.2 MainCP.cc
#include "BlockQueue.hpp"
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <functional>
// 任務對象
class Task
{
public:
//==typedef
using func_t = std::function<double(int, int, char)>;
Task() {}
Task(func_t callback, int x = 0, int y = 0, char op = '+') : _x(x), _y(y), _op(op), _callback(callback)
{
}
// 仿函數
std::string operator()()
{
double ret = _callback(_x, _y, _op);
char buffer[64];
snprintf(buffer, sizeof buffer, "%d %c %d = %lf", _x, _op, _y, ret);
return buffer;
}
private:
int _x;
int _y;
char _op;
func_t _callback; // 回調函數
};
//處理數據函數
double calculator(int x, int y, char op)
{
double ret = 0.0;
switch (op)
{
case '+':
ret = x + y;
break;
case '-':
ret = x - y;
break;
case '*':
ret = x * y;
break;
case '/':
if (y == 0)
ret = 0;
else
ret = (double)x / y;
break;
case '%':
if (y == 0)
ret = 0;
else
ret = x % y;
break;
default:
break;
}
return ret;
}
// 生產者任務
void *producer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// 1.獲取數據
const char *str = "+-*/%";
int x = rand() % 10 + 1;
int y = rand() % 5 + 1;
char op = str[rand() % 5];
// 2.構建任務對象&傳送對于的處理方法
Task t(calculator, x, y, op);
// 3.存入隊列
bq->push(t);
std::cout << "生產任務: " << x << " " << op << " " << y << " = ?" << std::endl;
sleep(1);
}
}
// 消費者任務
void *consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
Task t;
// 1.獲取任務
bq->pop(&t);
// 2.執(zhí)行仿函數
std::cout << "消費任務: " << t() << std::endl;
}
}
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
pthread_t c, p;
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_create(&c, nullptr, consumer, bq);
pthread_create(&p, nullptr, producer, bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
2.3 執(zhí)行結果
文章來源:http://www.zghlxwxcb.cn/news/detail-701561.html
三、效率優(yōu)勢
上面的是單個線程,可以拓展到多生產者多消費者!但是我們疑惑的是這個模型在訪問資源的時候,都是互斥的!他們只有一個能進入到臨界區(qū),這樣怎么是高效的?原來這個模型的高效不是在訪問臨界資源,而是對于每個線程可以獨立的準備數據!我們上述實現的數據來源以及數據處理是簡單的,但是后面數據可能來源于網絡或者磁盤文件,這個過程每個線程都可以同時在線獲取或者處理!這個過程如果串行是低效率的,但是多線程可以大大提高IO效率!這就是生產者消費者模型高效的原因!它可以將IO數據分散給多個線程并行處理,極大地提高了效率!文章來源地址http://www.zghlxwxcb.cn/news/detail-701561.html
到了這里,關于Linux基于多線程和任務隊列實現生產消費模型的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!