阻塞隊(duì)列(BlockQueue)介紹
阻塞隊(duì)列(Blocking Queue)是一種特殊類型的隊(duì)列,它具有阻塞操作的特性。在并發(fā)編程中,阻塞隊(duì)列可以用于實(shí)現(xiàn)線程間的安全通信和數(shù)據(jù)共享。
阻塞隊(duì)列的 主要特點(diǎn) 是:
- 當(dāng)隊(duì)列為空時(shí),消費(fèi)者線程嘗試從隊(duì)列中獲?。ǔ鲫?duì))元素時(shí)會(huì)被阻塞,直到有新的元素被添加到隊(duì)列中為止。
- 當(dāng)隊(duì)列已滿時(shí),生產(chǎn)者線程嘗試向隊(duì)列中添加(入隊(duì))元素時(shí)也會(huì)被阻塞,直到有空閑容量可用。
阻塞隊(duì)列通常提供入隊(duì)操作、出隊(duì)操作以及獲取隊(duì)列大小等基本方法。
阻塞隊(duì)列的實(shí)現(xiàn)在下文
生產(chǎn)者消費(fèi)者模型 介紹
生產(chǎn)者消費(fèi)者模型 是一種常用的 并發(fā)編程模型 ,用于解決多線程或多進(jìn)程環(huán)境下的協(xié)作問題。該模型包含兩類角色:生產(chǎn)者和消費(fèi)者。
生產(chǎn)者負(fù)責(zé)生成數(shù)據(jù),并將數(shù)據(jù)存放到共享的緩沖區(qū)中。消費(fèi)者則從緩沖區(qū)中獲取數(shù)據(jù)并進(jìn)行處理。生產(chǎn)者和消費(fèi)者之間通過共享的緩沖區(qū)進(jìn)行數(shù)據(jù)交互。
為了確保線程安全,生產(chǎn)者和消費(fèi)者需要遵循一些規(guī)則:
- 如果緩沖區(qū)已滿,則生產(chǎn)者需要等待直到有空間可用。
- 如果緩沖區(qū)為空,則消費(fèi)者需要等待直到有數(shù)據(jù)可用。
- 生產(chǎn)者和消費(fèi)者都不能訪問緩沖區(qū)的內(nèi)部結(jié)構(gòu),只能通過特定的接口進(jìn)行操作。
代碼實(shí)現(xiàn)
在代碼實(shí)現(xiàn)上,生產(chǎn)者消費(fèi)者模型通常涉及以下幾個(gè) 角色和操作 :
- 生產(chǎn)者(Producer):負(fù)責(zé)生成數(shù)據(jù)并將其放入共享的緩沖區(qū)。
- 消費(fèi)者(Consumer):從共享的緩沖區(qū)中獲取數(shù)據(jù)并進(jìn)行處理。
- 緩沖區(qū)(Buffer):用于暫存生產(chǎn)者生成的數(shù)據(jù),供消費(fèi)者使用。
- 同步機(jī)制:用于確保生產(chǎn)者和消費(fèi)者之間的協(xié)調(diào)和同步,以避免競態(tài)條件和數(shù)據(jù)不一致性等問題。
我們將要實(shí)現(xiàn)的代碼中:
阻塞隊(duì)列 作為緩沖區(qū),Task任務(wù)類 由生產(chǎn)者生產(chǎn)傳入阻塞隊(duì)列,以便消費(fèi)者拿去任務(wù)消費(fèi),lockGuard 與條件變量 保證 生產(chǎn)者消費(fèi)者之間的協(xié)調(diào),同步。
lockGuard.hpp()
在 lockGuard.hpp中我們 實(shí)現(xiàn)了一個(gè) 需封裝了互斥鎖的Mutex類和一個(gè) 實(shí)現(xiàn)自動(dòng)加解鎖的lockGuard類。
Mutex
類封裝了pthread_mutex_t類型的互斥鎖, lockGuard
類是一個(gè)RAII風(fēng)格的加鎖方式。
通過這種方式,lockGuard對(duì)象的生命周期和鎖的生命周期綁定在一起,可以確保在任何情況下都能保證鎖的正確釋放,避免死鎖等問題
完整代碼:
#pragma once
#include <iostream>
#include <pthread.h>
using std::cout; using std::endl;
// Mutex類封裝 pthread_mutex_t 互斥鎖
class Mutex
{
public:
// 構(gòu)造
Mutex(pthread_mutex_t* mtx):_pmtx(mtx){}
// 調(diào)用lock 進(jìn)行加鎖
void lock()
{
cout << "進(jìn)行加鎖" << endl;
pthread_mutex_lock(_pmtx);
}
// 調(diào)用unlock 進(jìn)行解鎖
void unlock()
{
cout << "進(jìn)行解鎖" << endl;
pthread_mutex_unlock(_pmtx);
}
~Mutex()
{}
private:
pthread_mutex_t* _pmtx;
};
// RAII 風(fēng)格的加鎖方式
// 以實(shí)現(xiàn)自動(dòng)加解鎖
class lockGuard
{
public:
// 構(gòu)造
lockGuard(pthread_mutex_t* mtx):_mtx(mtx)
{
_mtx.lock();
}
// 析構(gòu)
~lockGuard()
{
_mtx.unlock();
}
private:
Mutex _mtx;
};
Task.hpp(任務(wù)類)
下面的代碼 是一個(gè)簡化的 任務(wù)封裝類 ,用于生產(chǎn)者產(chǎn)生任務(wù)并將其放入阻塞隊(duì)列,供消費(fèi)者取出并執(zhí)行。通過將函數(shù)與參數(shù)打包成任務(wù),實(shí)現(xiàn)了任務(wù)的傳遞和執(zhí)行。
#pragma once
#include <iostream>
#include <functional>
// 表示一個(gè)函數(shù)類型。
// func_t是一個(gè)接受兩個(gè)整數(shù)參數(shù)并返回整數(shù)的函數(shù)類型
typedef std::function<int(int, int)> func_t;
// 任務(wù)類型: 用于生產(chǎn)者產(chǎn)生任務(wù)
class Task
{
public:
Task(){};
// 傳入三個(gè)參數(shù)x,y,以及一個(gè)函數(shù),task則執(zhí)行func(x, y)
Task(int x, int y, func_t func):_x(x),_y(y),_func(func)
{}
// 用于執(zhí)行任務(wù)。在函數(shù)體內(nèi)部,會(huì)調(diào)用存儲(chǔ)在 _func 中的函數(shù)對(duì)象,
// 并將 _x 和 _y 作為參數(shù)傳遞給這個(gè)函數(shù)對(duì)象。
// 最后 返回執(zhí)行結(jié)果。
int operator()()
{
return _func(_x,_y);
}
public:
// 用作函數(shù)參數(shù)
int _x;
int _y;
func_t _func;
};
BlockQueue.hpp(阻塞隊(duì)列)
對(duì) 阻塞隊(duì)列 進(jìn)行類的實(shí)現(xiàn):
BlockQueue包含以下成員變量:
std::queue<T> _bq; // 阻塞隊(duì)列
int _capacity; // 容量上限
pthread_mutex_t _mtx; // 互斥鎖: 保證隊(duì)列安全
pthread_cond_t _empty; // 表示bq是否為空
pthread_cond_t _full; // 表示bq是否為滿
以及除構(gòu)造函數(shù)/析構(gòu)函數(shù)外的以下 BlockQueue包含以下成員函數(shù):
bool isQueueEmpty() // 判斷隊(duì)列是否為空
bool isQueueFull() // 判斷隊(duì)列是否為滿
void push(const T &in) // 生產(chǎn)者用于制造任務(wù)
void pop(const T &in) // 消費(fèi)者用于消耗任務(wù)
完整代碼:
#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include "lockGuard.hpp"
const int gDefaultCap = 5; // 作為阻塞隊(duì)列的默認(rèn)容量
// 阻塞隊(duì)列
template <class T>
class BlockQueue
{
private:
// 判斷隊(duì)列是否為空
bool isQueueEmpty()
{
return _bq.size() == 0;
}
// 判滿
bool isQueueFull()
{
return _bq.size() == _capacity; // 當(dāng)size == _capacity 證明隊(duì)列已滿
}
public:
// 構(gòu)造
BlockQueue(int capacity = gDefaultCap) : _capacity(capacity)
{
// 初始化互斥鎖 && 條件變量
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_empty, nullptr);
pthread_cond_init(&_full, nullptr);
}
// 析構(gòu)
~BlockQueue()
{
// 銷毀 互斥鎖 && 條件變量
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
// 生產(chǎn)者進(jìn)程
void push(const T &in)
{
// 創(chuàng)建一個(gè)lockGuard 變量
lockGuard lockguard(&_mtx);
while(isQueueFull())
{
// 如果此時(shí)阻塞隊(duì)列為滿,進(jìn)程等待,直到有空位時(shí)改變_full
pthread_cond_wait(&_full, &_mtx);
}
// 此時(shí)阻塞隊(duì)列有空位,正常插入元素,并
_bq.push(in);
pthread_cond_signal(&_empty); // 發(fā)送信號(hào),表示隊(duì)列不再為空
pthread_mutex_unlock(&_mtx);
}
// 消費(fèi)者進(jìn)程
void pop(T *out)
{
lockGuard lockguard(&_mtx);
// pthread_mutex_lock(&mtx_);
while (isQueueEmpty()) // 如果隊(duì)列為空,等待生產(chǎn)者制造任務(wù)
pthread_cond_wait(&_empty, &_mtx);
// 此時(shí)隊(duì)列內(nèi)有任務(wù),
*out = _bq.front(); // 拿_bq的頭部元素,并執(zhí)行pop(拿任務(wù)+銷毀)
_bq.pop();
pthread_cond_signal(&_full);
pthread_mutex_unlock(&_mtx);
}
private:
std::queue<T> _bq; // 阻塞隊(duì)列
int _capacity; // 容量上限
pthread_mutex_t _mtx; // 互斥鎖: 保證隊(duì)列安全
pthread_cond_t _empty; // 表示bq是否為空
pthread_cond_t _full; // 表示bq是否為滿
};
conProd.cc(生產(chǎn)者消費(fèi)者模型 主進(jìn)程)
該文件中包含以下函數(shù):
-
myAdd 函數(shù):一個(gè)簡單的加法函數(shù),即實(shí)際執(zhí)行任務(wù)所執(zhí)行的函數(shù)。
-
consumer 函數(shù):消費(fèi)者線程的執(zhí)行函數(shù)。該函數(shù)從阻塞隊(duì)列中獲取任務(wù),并執(zhí)行任務(wù)的函數(shù)。
-
productor 函數(shù):生產(chǎn)者線程的執(zhí)行函數(shù)。該函數(shù)隨機(jī)生成兩個(gè)整數(shù)參數(shù),創(chuàng)建一個(gè)任務(wù)對(duì)象,并將任務(wù)對(duì)象插入到阻塞隊(duì)列中。
-
main 函數(shù):主函數(shù),用于創(chuàng)建并啟動(dòng)多個(gè)消費(fèi)者線程和生產(chǎn)者線程。通過調(diào)用 pthread_create 創(chuàng)建線程,并通過 pthread_join 等待線程結(jié)束。
完整代碼:
#include "blockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
// 加法函數(shù),用于生產(chǎn)者進(jìn)程產(chǎn)生任務(wù)
int myAdd(int x, int y)
{
return x + y;
}
// 消費(fèi)者進(jìn)程
void *consumer(void* args)
{
// 將獲得的agrs 參數(shù) 強(qiáng)制轉(zhuǎn)化為BlockQueue<Task>* 類型 并賦值給變量bqueue
BlockQueue<Task>* bqueue = (BlockQueue<Task>*)args;
while(true)
{
// 獲取任務(wù)
Task t;
bqueue->pop(&t); // 執(zhí)行任務(wù) + 銷毀
// 打印任務(wù)信息,因?yàn)槲覀兪褂玫膬H僅是一個(gè)加法函數(shù),所以直接打印"+"
cout << pthread_self() << " consumer: " << t._x << " + " << t._y << " = " << t() << endl;
}
return nullptr;
}
// 生產(chǎn)者進(jìn)程
void* productor(void* args)
{
BlockQueue<Task>* bqueue = (BlockQueue<Task>*)args;
while(true)
{
// 制造任務(wù)
// 生產(chǎn)者將任務(wù)傳到緩沖區(qū),消費(fèi)者再將其消耗
// 任務(wù)不一定有生產(chǎn)者制造,也可能通過外部獲得
// 隨機(jī)產(chǎn)生x, y兩個(gè)參數(shù),執(zhí)行Task
int x = rand() % 10 + 1;
usleep(rand() % 1000);
int y = rand() % 5 + 1;
Task t(x, y, myAdd);
// 發(fā)送任務(wù)
bqueue->push(t);
// 輸出消息
cout << pthread_self() << " productor: " << t._x << " + " << t._y << " = ?" << endl;
sleep(1);
}
return nullptr;
}
int main()
{
// getpid():獲取當(dāng)前進(jìn)程的進(jìn)程ID(PID),用于區(qū)分不同的進(jìn)程。
// 0x11451 用于增加種子的隨機(jī)性
srand((uint64_t)time(nullptr) ^ getpid() ^ 0x11451);
BlockQueue<Task>* bqueue = new BlockQueue<Task>();
pthread_t con[2], pro[2]; // 聲明兩個(gè)消費(fèi)者 / 生產(chǎn)者,增加并行性
// 可以將 &con[1] 換為 con+1
pthread_create(&con[0], nullptr, consumer, bqueue);
pthread_create(&con[1], nullptr, consumer, bqueue);
pthread_create(&pro[0], nullptr, productor, bqueue);
pthread_create(&pro[1], nullptr, productor, bqueue);
// 執(zhí)行完畢,等待進(jìn)程銷毀
pthread_join(con[0], nullptr);
pthread_join(con[1], nullptr);
pthread_join(pro[0], nullptr);
pthread_join(pro[1], nullptr);
delete bqueue; // 銷毀隊(duì)列
return 0;
}
執(zhí)行結(jié)果
文章來源:http://www.zghlxwxcb.cn/news/detail-663484.html
根據(jù)上面的執(zhí)行結(jié)果,可以看出,程序先連續(xù)生產(chǎn)(即加鎖信息的打印),阻塞隊(duì)列滿了后開始消費(fèi),后面重復(fù) 生產(chǎn)消費(fèi)(即加鎖解鎖)的步驟。文章來源地址http://www.zghlxwxcb.cn/news/detail-663484.html
到了這里,關(guān)于基于 BlockQueue(阻塞隊(duì)列) 的 生產(chǎn)者消費(fèi)者模型的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!