一.條件變量
- 條件變量是線程間共享的全局變量,線程間可以通過條件變量進(jìn)行同步控制
-
條件變量的使用必須依賴于互斥鎖以確保線程安全,線程申請了互斥鎖后,可以調(diào)用特定函數(shù)進(jìn)入條件變量等待隊(duì)列(同時(shí)釋放互斥鎖),其他線程則可以通過條件變量在特定的條件下喚醒該線程(喚醒后線程重新獲得互斥鎖),實(shí)現(xiàn)線程同步.
- 例如一個線程訪問隊(duì)列時(shí),發(fā)現(xiàn)隊(duì)列為空,則它只能等待其它線程將數(shù)據(jù)添加到隊(duì)列中,這種情況就需要用到條件變量.
-
線程同步的概念:在保證數(shù)據(jù)安全的前提下,讓線程能夠按照某種特定的順序訪問共享資源,從而有效避免線程饑餓問題(饑餓問題指線程長時(shí)間等待資源而無法被調(diào)度).
pthread線程庫提供的條件變量操作
//聲明全局互斥鎖并初始化
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//聲明全局條件變量并初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
- 線程等待條件:
任務(wù)線程代碼{
pthread_mutex_lock(&mutex);
if(條件為假)
pthread_cond_wait(&cond, &mutex);//等待時(shí)會釋放互斥鎖,等待完后自動加鎖
//訪問共享資源....
pthread_mutex_unlock(&mutex);
}
線程調(diào)用
pthread_cond_wait
等待時(shí),該接口會釋放互斥鎖,等待結(jié)束后自動加鎖
- 控制線程給條件變量發(fā)送喚醒信號
控制線程代碼{
if(滿足喚醒條件){
pthread_mutex_lock(&mutex);
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
}
}
喚醒操作加鎖是為了避免信號丟失
- 示例:
#include <iostream>
#include <unistd.h>
#include <pthread.h>
int cnt = 0;
//聲明全局互斥鎖并初始化
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//聲明全局條件變量并初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void *Count(void * args)
{
//線程分離,無需主線程等待
pthread_detach(pthread_self());
uint64_t number = (uint64_t)args;
std::cout << "pthread: " << number << " create success" << std::endl;
while(true)
{
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
std::cout << "pthread: " << number << " , cnt: " << cnt++ << std::endl;
pthread_mutex_unlock(&mutex);
}
}
int main()
{
for(uint64_t i = 0; i < 4; i++)
{
pthread_t tid;
pthread_create(&tid, nullptr, Count, (void*)i);
usleep(1000);
}
sleep(3);
std::cout << "main thread ctrl begin: " << std::endl;
while(true)
{
sleep(1);
//喚醒在cond的等待隊(duì)列中等待的一個線程,默認(rèn)都是第一個
pthread_mutex_lock(&mutex);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
//按順序喚醒在cond的等待隊(duì)列中的所有線程
//pthread_cond_broadcast(&cond);
std::cout << "signal one thread..." << std::endl;
}
return 0;
}
-
線程同步過程圖解:
- 條件變量和鎖的銷毀:
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
二.生產(chǎn)者消費(fèi)者模型
- 生產(chǎn)者消費(fèi)者模型是一種多線程并發(fā)協(xié)作的設(shè)計(jì)框架,生產(chǎn)者負(fù)責(zé)生成并發(fā)送數(shù)據(jù),消費(fèi)者負(fù)責(zé)接收并處理數(shù)據(jù).
- 生產(chǎn)者和消費(fèi)者之間存在一個數(shù)據(jù)容器作為緩沖區(qū),生產(chǎn)者生產(chǎn)的數(shù)據(jù)存入容器中,消費(fèi)者需要的數(shù)據(jù)從容器中獲取,實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者之間的數(shù)據(jù)傳輸解耦
- 數(shù)據(jù)容器由互斥鎖保護(hù),同一個時(shí)刻只能有一個線程訪問數(shù)據(jù)容器,生產(chǎn)者和消費(fèi)者之間通過條件變量(或信號量)實(shí)現(xiàn)同步
-
對于數(shù)據(jù)容器的訪問,生產(chǎn)者和消費(fèi)者遵循三個原則:
- 生產(chǎn)者和生產(chǎn)者之間互斥
- 消費(fèi)者和消費(fèi)者之間互斥
-
生產(chǎn)者和消費(fèi)者之間互斥并同步
生產(chǎn)者消費(fèi)者模型的高效性
-
由于生產(chǎn)者和消費(fèi)者之間的數(shù)據(jù)傳輸解耦,生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理數(shù)據(jù),而是直接將數(shù)據(jù)存入容器,消費(fèi)者不需要向生產(chǎn)者請求數(shù)據(jù),而是直接從容器里獲取數(shù)據(jù),因此即便在生產(chǎn)者和消費(fèi)者的效率不對等且多變的情況下,多個生產(chǎn)者依然可以高效專一地并發(fā)生產(chǎn)數(shù)據(jù),多個消費(fèi)者依然可以高效專一地并發(fā)處理數(shù)據(jù),使得系統(tǒng)整體的并發(fā)量得到提高
基于環(huán)形隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型中的數(shù)據(jù)容器
- 環(huán)形隊(duì)列中,消費(fèi)者訪問隊(duì)列的頭指針進(jìn)行數(shù)據(jù)出隊(duì)操作,生產(chǎn)者訪問隊(duì)列的尾指針進(jìn)行數(shù)據(jù)入隊(duì)操作
-
兩把互斥鎖分別保證消費(fèi)者和消費(fèi)者之間的互斥以及生產(chǎn)者和生產(chǎn)者之間的互斥,兩個信號量實(shí)現(xiàn)消費(fèi)者和生產(chǎn)者之間的互斥與同步
- 當(dāng)環(huán)形隊(duì)列既不為空也不為滿時(shí),支持一個生產(chǎn)者和一個消費(fèi)者并發(fā)地進(jìn)行數(shù)據(jù)的存取
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
//環(huán)形隊(duì)列默認(rèn)容量
const static int defaultcap = 5;
template<class T>
class RingQueue{
private:
std::vector<T> ringqueue_;
int cap_; //容器的容量
int c_step_; // 消費(fèi)者環(huán)形隊(duì)列指針
int p_step_; // 生產(chǎn)者環(huán)形隊(duì)列指針
sem_t cdata_sem_; // 消費(fèi)者的數(shù)據(jù)資源
sem_t pspace_sem_; // 生產(chǎn)者的空間資源
pthread_mutex_t c_mutex_; //消費(fèi)者與消費(fèi)者之間的互斥鎖
pthread_mutex_t p_mutex_; //生產(chǎn)者與生產(chǎn)者之間的互斥鎖
public:
RingQueue(int cap = defaultcap)
:ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
{
//初始化生產(chǎn)者和消費(fèi)者的信號量-->消費(fèi)者一開始沒有信號量資源,生產(chǎn)者一開始具有最多的空間資源
sem_init(&cdata_sem_, 0, 0);
sem_init(&pspace_sem_, 0, cap);
pthread_mutex_init(&c_mutex_, nullptr);
pthread_mutex_init(&p_mutex_, nullptr);
}
~RingQueue()
{
sem_destroy(&cdata_sem_);
sem_destroy(&pspace_sem_);
pthread_mutex_destroy(&c_mutex_);
pthread_mutex_destroy(&p_mutex_);
}
//信號量的資源狀態(tài)可以區(qū)分隊(duì)列的空和滿
void Push(const T &in)
{
//生產(chǎn)者等待空間資源
sem_wait(&pspace_sem_);
pthread_mutex_lock(&p_mutex_);
ringqueue_[p_step_] = in;
p_step_++;
p_step_ %= cap_;
pthread_mutex_unlock(&p_mutex_);
//生產(chǎn)完數(shù)據(jù)后增加消費(fèi)者的信號量資源
sem_post(&cdata_sem_);
}
void Pop(T *out)
{
//消費(fèi)者等待數(shù)據(jù)資源
sem_wait(&cdata_sem_);
pthread_mutex_lock(&c_mutex_);
*out = ringqueue_[c_step_];
c_step_++;
c_step_ %= cap_;
pthread_mutex_unlock(&c_mutex_);
//消費(fèi)完數(shù)據(jù)后增加生產(chǎn)者的信號量資源
sem_post(&pspace_sem_);
}
};
基于生產(chǎn)者消費(fèi)者模型實(shí)現(xiàn)單例線程池
文章來源:http://www.zghlxwxcb.cn/news/detail-804392.html
- 線程池將線程安全的數(shù)據(jù)容器和用容器組織起來的線程封裝在一起,系統(tǒng)啟動時(shí)完成線程的創(chuàng)建,系統(tǒng)關(guān)閉時(shí)再銷毀線程,不僅可以有效提高系統(tǒng)的并發(fā)量同時(shí)可以避免頻繁創(chuàng)建和銷毀線程帶來的性能損失,組織起來的線程也更方便進(jìn)行管理和監(jiān)控.
#pragma once
#include <ctime>
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "Mythread.cpp"
#include "sem_cpmodel.cpp"
struct ThreadInfo
{
pthread_t tid;
std::string name;
};
//線程池默認(rèn)線程數(shù)
static const int defalutnum = 5;
template <class T>
class ThreadPool
{
private:
//用來管理線程的容器
std::vector<ThreadInfo> threads_;
//線程安全的環(huán)形隊(duì)列
RingQueue<T> tasks_;
//懶漢單例模式靜態(tài)指針
static ThreadPool<T> * TPtr;
static pthread_mutex_t _Slock;
public:
std::string GetThreadName(pthread_t tid){
for(const auto &ti : threads_){
if(ti.tid == tid) return ti.name;
}
return "None";
}
static ThreadPool<T> * Getinstance(){
//多套一層判斷提高并發(fā)效率
if(TPtr == nullptr){
//加鎖保護(hù)靜態(tài)指針
pthread_mutex_lock(&_Slock);
if(TPtr == nullptr){
std :: cout << "SingleTon Created...." << std ::endl;
TPtr = new ThreadPool<T>();
}
pthread_mutex_unlock(&_Slock);
}
return TPtr;
}
private:
ThreadPool(int num = defalutnum) : threads_(num)
{}
ThreadPool(ThreadPool<T>&& TP) = delete;
ThreadPool(const ThreadPool<T>& TP) = delete;
ThreadPool<T>& operator=(const ThreadPool<T>& TP) = delete;
public:
//線程的執(zhí)行函數(shù)
static void *HandlerTask(void *args){
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
std::string name = tp->GetThreadName(pthread_self());
while (true){
//線程從環(huán)形隊(duì)列中獲取任務(wù)并執(zhí)行任務(wù)
T t;
tp->tasks_.Pop(&t);
//執(zhí)行任務(wù)代碼段-->根據(jù)業(yè)務(wù)需求編寫
std::cout << name << " run, "<< "result: " << t.GetResult() << std::endl;
}
}
//創(chuàng)建線程池中的線程
void Start(){
int num = threads_.size();
for (int i = 0; i < num; i++){
threads_[i].name = "thread-" + std::to_string(i + 1);
//線程函數(shù)參數(shù)傳遞this指針以訪問成員變量
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
}
}
//將任務(wù)存入環(huán)形隊(duì)列中
void Push(const T &t){
tasks_.Push(t);
}
};
//初始化靜態(tài)指針
template <class T>
ThreadPool<T> * ThreadPool<T>::TPtr = nullptr;
//初始化靜態(tài)鎖
template <class T>
pthread_mutex_t ThreadPool<T>::_Slock = PTHREAD_MUTEX_INITIALIZER;
-
并發(fā)測試:
文章來源地址http://www.zghlxwxcb.cn/news/detail-804392.html
到了這里,關(guān)于線程同步--生產(chǎn)者消費(fèi)者模型的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!