1. 生產(chǎn)消費者模型基本概念
生產(chǎn)者消費者模型是一種常用的并發(fā)設(shè)計模式,它可以解決生產(chǎn)者和消費者之間的速度不匹配、解耦、異步等問題。生產(chǎn)者消費者模型的應(yīng)用場景有很多,例如Excutor任務(wù)執(zhí)行框架、消息中間件activeMQ、任務(wù)的處理時間比較長的情況下等。
生產(chǎn)者消費者模型的基本結(jié)構(gòu)如下:
- 生產(chǎn)者(Producer):負(fù)責(zé)生成數(shù)據(jù)或任務(wù),放入緩沖區(qū)(Buffer)中。
- 消費者(Consumer):負(fù)責(zé)從緩沖區(qū)中取出數(shù)據(jù)或任務(wù),進(jìn)行處理。
- 緩沖區(qū)(Buffer):一般是一個有限大小的隊列,用來存儲生產(chǎn)者生成的數(shù)據(jù)或任務(wù),同時提供給消費者使用。
生產(chǎn)者消費者模型的核心是緩沖區(qū),它可以平衡生產(chǎn)者和消費者的處理能力,起到一個數(shù)據(jù)緩存的作用,同時也達(dá)到了一個解耦的作用
。緩沖區(qū)的實現(xiàn)方式有多種,例如:
- 使用Object的wait()和notify()方法,讓生產(chǎn)者和消費者在緩沖區(qū)滿或空時進(jìn)行等待和喚醒。
- 使用Semaphore的acquire()和release()方法,讓生產(chǎn)者和消費者通過信號量控制緩沖區(qū)的訪問。
- 使用BlockingQueue阻塞隊列,讓生產(chǎn)者和消費者通過put()和take()方法自動實現(xiàn)阻塞和喚醒。
- 使用Lock和Condition的await()和signal()方法,讓生產(chǎn)者和消費者通過條件變量控制緩沖區(qū)的狀態(tài)。
- 使用PipedInputStream和PipedOutputStream,讓生產(chǎn)者和消費者通過管道流進(jìn)行通信。
生產(chǎn)者消費者模型的應(yīng)用場景有很多,例如:
- Excutor任務(wù)執(zhí)行框架:通過將任務(wù)的提交和任務(wù)的執(zhí)行解耦開來,提交任務(wù)的操作相當(dāng)于生產(chǎn)者,執(zhí)行任務(wù)的操作相當(dāng)于消費者。
- 消息中間件activeMQ: 雙十一的時候,會產(chǎn)生大量的訂單,那么不可能同時處理那么多的訂單,需要將訂單放入一個隊列里面,然后由專門的線程處理訂單。
- 任務(wù)的處理時間比較長的情況下:比如上傳附件并處理,那么這個時候可以將用戶上傳和處理附件分成兩個過程,用一個隊列暫時存儲用戶上傳的附件,然后立刻返回用戶上傳成功,然后有專門的線程處理隊列中的附件。
生產(chǎn)者消費者模型優(yōu)點:
- 解耦:生產(chǎn)者和消費者之間不直接通信,而是通過緩沖區(qū)來進(jìn)行通信,降低了代碼之間的依賴性,簡化了工作負(fù)載的管理。
- 復(fù)用:生產(chǎn)者和消費者可以獨立地進(jìn)行復(fù)用和擴(kuò)展,增加了代碼的可維護(hù)性和可擴(kuò)展性。
調(diào)整并發(fā)數(shù):生產(chǎn)者和消費者的處理速度可能不一致,可以通過調(diào)整并發(fā)數(shù)來平衡速度差異,提高系統(tǒng)的吞吐量和效率。 - 異步:生產(chǎn)者不需要等待消費者處理完數(shù)據(jù)才能繼續(xù)生產(chǎn),消費者也不需要等待生產(chǎn)者生成數(shù)據(jù)才能繼續(xù)消費,通過異步的方式支持高并發(fā),提高系統(tǒng)的響應(yīng)性和靈活性。
- 支持分布式:生產(chǎn)者和消費者可以運行在不同的機器上,通過分布式的緩沖區(qū)來進(jìn)行通信,增加了系統(tǒng)的可伸縮性和容錯性。
2. 基于BlockingQueue的生產(chǎn)者消費者模型
在多線程編程中阻塞隊列(Blocking Queue)是一種常用于實現(xiàn)生產(chǎn)者和消費者模型的數(shù)據(jù)結(jié)構(gòu)。其與普通的隊列區(qū)別在于,當(dāng)隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中被放入了元素;當(dāng)隊列滿時,往隊列里存放元素的操作也會被阻塞,直到有元素被從隊列中取出(以上的操作都是基于不同的線程來說的,線程在對阻塞隊列進(jìn)程操作時會被阻塞)。
基于BlockingQueue的生產(chǎn)者消費者模型,可以封裝一個類,這個類就只有簡單的插入、刪除操作是一個簡單的阻塞隊列,并且內(nèi)部用條件變量來實現(xiàn)。
blockQueue.hpp源代碼:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
using namespace std;
const int gcap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(const int cap = gacp)
:_cap = cap
{
pthread_mutex_init(&mutex, nullptr);
pthread_cond_init(&_consumerCond, nullptr);
pthread_cond_init(&_productorCond, nullptr);
}
bool isFull()
{
return _q.size() == _cap;
}
bool isEmpty()
{
return _q.empty();
}
void push(const T &in)
{
pthread_mutex_lock(&_mutex);
while (isFull())
{
pthread_cond_wait(&_productorCond, &_mutex);
sleep(1); // 每隔1s詢問一次隊列是否為滿,因為消費者可能在這1s中消費
}
_q.push(in); // 隊列不滿,則可以插入
pthread_cond_signal(&_consumerCond); // 隊列某一時刻可能為空,消費者被阻塞,所以需要在這喚醒消費者
// 線程如果醒著,那么再喚醒不會出問題;相反線程阻塞,再次用函數(shù)阻塞也沒問題
pthread_mutex_unlock(&_mutex);
}
void pop(T *out)
{
pthread_mutex_lock(&mutex);
while (isEmpty())
{
pthread_cond_wait(&_consumerCond, &_mutex);
sleep(1);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_productorCond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumerCond);
pthread_cond_destroy(&_productorCond);
}
private:
queue<T> _q;
int _cap;
pthread_mutex_t _mutex;
pthread_cond_t _consumerCond; // 消費者對應(yīng)的條件變量,若空,則wait
pthread_cond_t _productorCond; // 生產(chǎn)者對應(yīng)的條件變量,若滿,則wait
};
然后就可以實現(xiàn)簡單的單生產(chǎn)單消費以及多生產(chǎn)多消費的樣例,main文件中的代碼如下:
#include "blockQueue.hpp"
#include <time.h>
void *productor(void *args)
{
BlockQueue<int> *q = static_cast<BlockQueue<int>*>(args);
int count = 20;
while (count--)
{
int val = rand() % 5 + 1;
cout << "生產(chǎn)的數(shù)據(jù):" << val << endl;
q->push(val);
}
return nullptr;
}
void *consumer(void* args)
{
BlockQueue<int> *q = static_cast<BlockQueue<int>*>(args);
while (true)
{
int val = 0;
q->pop(&val);
cout << "消費的數(shù)據(jù):" << val << endl;
usleep(300);
if (q->isEmpty())
break;
}
return nullptr;
}
// 單生產(chǎn)單消費
int main()
{
srand((unsigned int)time(0)); //隨機數(shù)種子
BlockQueue<int> *q = new BlockQueue<int>;
pthread_t c, p;
pthread_create(&c, nullptr, consumer, q);
pthread_create(&p, nullptr, productor, q);
while (true)
{
sleep(1);
if (q->isEmpty())
break;
}
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete q;
return 0;
}
//多生產(chǎn)多消費
// int main()
// {
// srand((uint64_t)time(nullptr));
// // BlockQueue<int> *bq = new BlockQueue<int>();
// BlockQueue<int> *bq = new BlockQueue<int>();
// // 單生產(chǎn)和單消費 -> 多生產(chǎn)和多消費
// pthread_t c[2], p[3];
// pthread_create(&c[0], nullptr, consumer, bq);
// pthread_create(&c[1], nullptr, consumer, bq);
// pthread_create(&p[0], nullptr, productor, bq);
// pthread_create(&p[1], nullptr, productor, bq);
// pthread_create(&p[2], nullptr, productor, bq);
// pthread_join(c[0], nullptr);
// pthread_join(c[1], nullptr);
// pthread_join(p[0], nullptr);
// pthread_join(p[1], nullptr);
// pthread_join(p[2], nullptr);
// delete bq;
// return 0;
// }
運行結(jié)果如下:
基于BlockingQueue的生產(chǎn)者消費者模型是一種常見的多線程設(shè)計模式,它有以下幾個優(yōu)點:
- 簡化編程:BlockingQueue提供了線程安全的入隊和出隊操作,無需自己實現(xiàn)同步和鎖機制,降低了編程難度和出錯風(fēng)險。
- 提高性能:BlockingQueue支持阻塞和超時機制,可以根據(jù)隊列的狀態(tài)自動調(diào)整生產(chǎn)者和消費者的狀態(tài),避免了無效的等待和輪詢,提高了系統(tǒng)的吞吐量和響應(yīng)速度。
- 增強可擴(kuò)展性:BlockingQueue可以作為有界隊列或無界隊列使用,可以根據(jù)實際需求調(diào)整隊列的容量和策略,增加了系統(tǒng)的靈活性和可擴(kuò)展性。
3. 基于環(huán)形隊列的生產(chǎn)消費模型
環(huán)形隊列是一種特殊的隊列,它是在隊列的基礎(chǔ)上添加了一些限制條件,使得隊列可以在固定大小的存儲空間下進(jìn)行循環(huán)使用。環(huán)形隊列可以用數(shù)組實現(xiàn),數(shù)組中的元素按照一定的順序排列,并且當(dāng)隊列頭或者隊列尾指針到達(dá)數(shù)組的尾部時,會自動從數(shù)組的頭部開始重新循環(huán)使用。環(huán)形隊列的一個好處是,當(dāng)隊列滿時,可以通過覆蓋隊列頭部的元素來繼續(xù)存儲新的元素,這樣可以使得隊列在一定程度上具有循環(huán)使用的能力,節(jié)省存儲空間。但是在使用環(huán)形隊列時需要注意一些細(xì)節(jié)問題,比如隊列空、隊列滿、隊列大小等等。
環(huán)形結(jié)構(gòu)起始狀態(tài)和結(jié)束狀態(tài)都是一樣的,不好判斷為空或者為滿,所以可以通過加計數(shù)器或者標(biāo)記位來判斷滿或者空。另外也可以預(yù)留一個空的位置,作為滿的狀態(tài)。但是現(xiàn)在有信號量這個計數(shù)器,就很簡單的進(jìn)行多線程間的同步過程,所以環(huán)形隊列的生產(chǎn)消費者模型內(nèi)部使用信號量來實現(xiàn)。
基于環(huán)形隊列的生產(chǎn)消費模型的代碼如下:
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
using namespace std;
template <class T>
class RingQueue
{
public:
RingQueue(int num = N)
:_ring(num)
,_cap(num)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, _cap);
_c_step = _p_step = 0;
pthread_mutex_init(&_c_mutex,nullptr);
pthread_mutex_init(&_p_mutex,nullptr);
}
// 生產(chǎn)
void push(const T &in)
{
P(_space_sem);
Lock(_p_mutex);
_ring[_p_step++] = in;
_p_step %= _cap;
Unlock(_p_mutex);
V(_data_sem);
}
// 消費
void pop(T &out)
{
P(_data_sem);
Lock(_c_mutex);
out = _ring[_c_step++];
_c_step %= _cap;
Unlock(_c_mutex);
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
private:
vector<T> _ring;
int _cap; // 環(huán)形隊列容器大小
sem_t _data_sem; // 表示數(shù)據(jù)量的信號量,只有消費者關(guān)心
sem_t _space_sem; // 表示空間量的信號量,只有生產(chǎn)者關(guān)心
int _c_step; // 環(huán)形隊列中消費的位置
int _p_step; // 環(huán)形隊列中生產(chǎn)的位置
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
然后生產(chǎn)一些任務(wù),這些任務(wù)不弄簡單的隨機數(shù),而弄一些隨機出來的加減乘除運算,所以再封裝一個類,這個類可以做加減乘除運算,該類的代碼如下:
#pragma once
#include <iostream>
using namespace std;
class Task
{
public:
Task()
{}
Task(const int x, const int y, const char op)
:_x(x)
,_y(y)
,_op(op)
,_exitCode(0)
{}
void operator()()
{
switch(_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
break;
case '%':
_result = _x % _y;
break;
default:
break;
}
}
string formatArg() // 輸入的格式
{
return to_string(_x) + _op +to_string(_y) + '=';
}
string formatRes() // 輸出的格式
{
return to_string(_result) + '(' + to_string(_exitCode) + ')';
}
~Task()
{}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
然后直接用多生產(chǎn)多消費進(jìn)行驗證,多生產(chǎn)多消費的main.cc文件代碼如下:
#include "RingQueue.hpp"
#include "task.hpp"
#include <ctime>
#include <pthread.h>
#include <memory>
#include <sys/types.h>
#include <unistd.h>
#include <cstring>
using namespace std;
const char *ops = "+-*/%";
void *consumerRoutine(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
Task t;
rq->pop(t);
t();
cout << "consumer done, 處理完成的任務(wù)是: " << t.formatRes() << endl;
}
}
void *productorRoutine(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
sleep(1);
int x = rand() % 100;
int y = rand() % 100;
char op = ops[(x + y) % strlen(ops)];
Task t(x, y, op);
rq->push(t);
cout << "productor done, 生產(chǎn)的任務(wù)是: " << t.formatArg() << endl;
}
}
int main()
{
srand(time(nullptr));
RingQueue<Task> *rq = new RingQueue<Task>();
// 單生產(chǎn)單消費
// pthread_t c, p;
// pthread_create(&c, nullptr, consumerRoutine, rq);
// pthread_create(&p, nullptr, productorRoutine, rq);
// pthread_join(c, nullptr);
// pthread_join(p, nullptr);
pthread_t c[3], p[2];
for (int i = 0; i < 3; i++)
pthread_create(c + i, nullptr, consumerRoutine, rq);
for (int i = 0; i < 2; i++)
pthread_create(p + i, nullptr, productorRoutine, rq);
for (int i = 0; i < 3; i++)
pthread_join(c[i], nullptr);
for (int i = 0; i < 2; i++)
pthread_join(p[i], nullptr);
delete rq;
return 0;
}
最后就會有如下形式的任務(wù)被生產(chǎn)者派發(fā),然后由消費者處理問題。
基于環(huán)形隊列的生產(chǎn)消費模型是一種常見的并發(fā)同步模式,它有以下優(yōu)缺點:
優(yōu)點:
- 解耦:生產(chǎn)者和消費者不直接交互,而是通過環(huán)形隊列進(jìn)行數(shù)據(jù)傳遞,降低了兩者之間的耦合度。
- 支持并發(fā):生產(chǎn)者和消費者可以同時訪問環(huán)形隊列的不同位置,提高了并發(fā)性能。
- 支持忙閑不均:當(dāng)生產(chǎn)者和消費者的速度不匹配時,環(huán)形隊列可以緩沖數(shù)據(jù),避免數(shù)據(jù)丟失或阻塞。
缺點:
- 需要額外的空間:環(huán)形隊列需要預(yù)先分配固定大小的空間,可能造成空間浪費或不足。
- 需要額外的同步機制:環(huán)形隊列需要使用信號量或其他同步機制來控制生產(chǎn)者和消費者之間的協(xié)作,增加了編程復(fù)雜度。
- 可能出現(xiàn)饑餓或飽和:當(dāng)環(huán)形隊列滿或空時,生產(chǎn)者或消費者可能會長時間等待,影響系統(tǒng)的響應(yīng)性。
4. 線程池
Linux線程池是一種管理多個線程的技術(shù),它可以提高程序的性能和資源利用率。Linux線程池的基本思想是:
- 預(yù)先創(chuàng)建一定數(shù)量的線程,放在一個池中,這些線程稱為核心線程。
- 當(dāng)有新的任務(wù)到來時,如果有空閑的核心線程,就分配給它執(zhí)行;如果沒有空閑的核心線程,就將任務(wù)放在一個任務(wù)隊列中,等待有空閑的線程來執(zhí)行。
- 如果任務(wù)隊列也滿了,就創(chuàng)建新的線程,超過核心線程數(shù)量的線程稱為非核心線程。
- 如果非核心線程空閑時間超過一定的限制,就銷毀這些線程,回收資源。
- 如果核心線程空閑時間超過一定的限制,并且設(shè)置了允許回收核心線程的標(biāo)志,就銷毀這些線程,回收資源。
Linux線程池的優(yōu)點有:
- 降低創(chuàng)建和銷毀線程的開銷,提高響應(yīng)速度。
- 控制并發(fā)的數(shù)量,避免過多的線程競爭,提高系統(tǒng)穩(wěn)定性。
- 統(tǒng)一管理和調(diào)度線程,提高代碼的可維護(hù)性。
Linux線程池的實現(xiàn)方法有:
- 使用POSIX標(biāo)準(zhǔn)提供的pthread庫來創(chuàng)建和管理線程,使用互斥鎖和條件變量來實現(xiàn)任務(wù)隊列和同步機制。
- 使用C++標(biāo)準(zhǔn)庫中的std::thread類來創(chuàng)建和管理線程,使用std::queue容器來實現(xiàn)任務(wù)隊列,使用std::mutex和std::condition_variable來實現(xiàn)同步機制。
- 使用第三方庫或框架來實現(xiàn)線程池,例如Boost.Asio、libevent、libuv等。
對于上述所描述的,可以使用互斥鎖和條件變量來實現(xiàn)任務(wù)隊列和同步機制,實現(xiàn)簡單的線程池,代碼如下:文章來源:http://www.zghlxwxcb.cn/news/detail-723417.html
#pragma once
#include <iostream>
#include <queue>
#include <vector>
#include <pthread.h>
#include <unistd.h>
#include "task.hpp"
using namespace std;
const static int N = 5;
template <class T>
class ThreadPool
{
public:
ThreadPool(int num = N)
:_num(num)
,_threads(num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
void lockQueue()
{
pthread_mutex_lock(&_mutex);
}
void unlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void threadWait()
{
pthread_cond_wait(&_cond, &_mutex);
}
void threadWakeup()
{
pthread_cond_signal(&_cond);
}
bool isEmpty()
{
return _tasks.empty();
}
T popTask()
{
T t = _tasks.front();
_tasks.pop();
return t;
}
void pushTask(const T &t)
{
lockQueue();
_tasks.push(t);
threadWakeup(); // 插入任務(wù)后喚醒線程處理任務(wù)
unlockQueue();
}
static void *threadRoutine(void *args) // 對于類的內(nèi)部成員函數(shù),會有默認(rèn)的this指針,所以可以將這個函數(shù)定義在類的外部,
// 或者定義靜態(tài)成員函數(shù),但靜態(tài)成員函數(shù)不能直接訪問類的內(nèi)部成員。
{
pthread_detach(pthread_self()); // 線程分離,這樣子線程就可以自己釋放自己的資源
ThreadPool<T> *tp = static_cast<ThreadPool<T>*>(args);
// 對于tp指針來說,他不能訪問私有成員,所以可以用一些函數(shù)去訪問類的私有成員,或者將私有成員暴露出來,屬性設(shè)置為public
while (true)
{
// 檢測有沒有任務(wù)
tp->lockQueue();
while (tp->isEmpty())
{
tp->threadWait();
}
T t = tp->popTask(); // 拿出隊列中的任務(wù)
tp->unlockQueue();
//test:放入一些數(shù)據(jù)
t(); // task任務(wù)內(nèi)部是用該仿函數(shù)來處理任務(wù)的
cout << "thread handler done, result: " << t.formatRes() << std::endl;
}
}
void start()
{
for (int i = 0; i < _num; ++i)
{
pthread_create(&_threads[i], nullptr, threadRoutine, this);
}
}
~ThreadPool()
{
pthread_cond_destroy(&_cond);
pthread_mutex_destroy(&_mutex);
}
private:
vector<pthread_t> _threads;
int _num;
queue<T> _tasks;
pthread_mutex_t _mutex; // 使用互斥鎖和條件變量來實現(xiàn)任務(wù)隊列和同步機制
pthread_cond_t _cond;
};
#include <memory>
int main()
{
unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
tp->start();
while (true)
{
int x, y;
char op;
cout << "please Enter x> ";
cin >> x;
cout << "please Enter y> ";
cin >> y;
cout << "please Enter op(+-*/%)> ";
cin >> op;
Task t(x, y, op);
tp->pushTask(t);
usleep(500);
}
return 0;
}
當(dāng)有任務(wù)時,該線程池會處理任務(wù),沒有任務(wù)是則會等待。文章來源地址http://www.zghlxwxcb.cn/news/detail-723417.html
到了這里,關(guān)于Linux線程同步實例的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!