国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

線程同步--生產(chǎn)者消費(fèi)者模型

這篇具有很好參考價(jià)值的文章主要介紹了線程同步--生產(chǎn)者消費(fèi)者模型。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

線程同步--生產(chǎn)者消費(fèi)者模型,青菜的Linux專欄,linux

一.條件變量

  • 條件變量是線程間共享的全局變量,線程間可以通過條件變量進(jìn)行同步控制
  • 條件變量的使用必須依賴于互斥鎖以確保線程安全,線程申請了互斥鎖后,可以調(diào)用特定函數(shù)進(jìn)入條件變量等待隊(duì)列(同時(shí)釋放互斥鎖),其他線程則可以通過條件變量在特定的條件下喚醒該線程(喚醒后線程重新獲得互斥鎖),實(shí)現(xiàn)線程同步.
    • 例如一個線程訪問隊(duì)列時(shí),發(fā)現(xiàn)隊(duì)列為空,則它只能等待其它線程將數(shù)據(jù)添加到隊(duì)列中,這種情況就需要用到條件變量.
    • 線程同步的概念:在保證數(shù)據(jù)安全的前提下,讓線程能夠按照某種特定的順序訪問共享資源,從而有效避免線程饑餓問題(饑餓問題指線程長時(shí)間等待資源而無法被調(diào)度).
      線程同步--生產(chǎn)者消費(fèi)者模型,青菜的Linux專欄,linux

pthread線程庫提供的條件變量操作

//聲明全局互斥鎖并初始化
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//聲明全局條件變量并初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  • 線程等待條件:
任務(wù)線程代碼{
	pthread_mutex_lock(&mutex);
	if(條件為假)
		pthread_cond_wait(&cond, &mutex);//等待時(shí)會釋放互斥鎖,等待完后自動加鎖
	//訪問共享資源....
	pthread_mutex_unlock(&mutex);
}

線程調(diào)用pthread_cond_wait等待時(shí),該接口會釋放互斥鎖,等待結(jié)束后自動加鎖

  • 控制線程給條件變量發(fā)送喚醒信號
控制線程代碼{
	if(滿足喚醒條件){
		pthread_mutex_lock(&mutex);
		pthread_cond_signal(cond);
		pthread_mutex_unlock(&mutex);
	}
}

喚醒操作加鎖是為了避免信號丟失

  • 示例:
#include <iostream>
#include <unistd.h>
#include <pthread.h>

int cnt = 0;
//聲明全局互斥鎖并初始化
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//聲明全局條件變量并初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void *Count(void * args)
{
    //線程分離,無需主線程等待
    pthread_detach(pthread_self());
    uint64_t number = (uint64_t)args;
    std::cout << "pthread: " << number << " create success" << std::endl;

    while(true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond, &mutex);               
        std::cout << "pthread: " << number << " , cnt: " << cnt++ << std::endl;
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    for(uint64_t i = 0; i < 4; i++)
    {
        pthread_t tid;
        pthread_create(&tid, nullptr, Count, (void*)i);
        usleep(1000);
    }
    sleep(3);
    std::cout << "main thread ctrl begin: " << std::endl;

    while(true) 
    {
        sleep(1);
        //喚醒在cond的等待隊(duì)列中等待的一個線程,默認(rèn)都是第一個
        pthread_mutex_lock(&mutex);
        pthread_cond_signal(&cond); 
        pthread_mutex_unlock(&mutex);
        //按順序喚醒在cond的等待隊(duì)列中的所有線程
        //pthread_cond_broadcast(&cond);
        std::cout << "signal one thread..." << std::endl;
    }

    return 0;
}
  • 線程同步過程圖解:
    線程同步--生產(chǎn)者消費(fèi)者模型,青菜的Linux專欄,linux
    線程同步--生產(chǎn)者消費(fèi)者模型,青菜的Linux專欄,linux
    線程同步--生產(chǎn)者消費(fèi)者模型,青菜的Linux專欄,linux
  • 條件變量和鎖的銷毀:
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);

二.生產(chǎn)者消費(fèi)者模型

  • 生產(chǎn)者消費(fèi)者模型是一種多線程并發(fā)協(xié)作的設(shè)計(jì)框架,生產(chǎn)者負(fù)責(zé)生成并發(fā)送數(shù)據(jù),消費(fèi)者負(fù)責(zé)接收并處理數(shù)據(jù).
  • 生產(chǎn)者和消費(fèi)者之間存在一個數(shù)據(jù)容器作為緩沖區(qū),生產(chǎn)者生產(chǎn)的數(shù)據(jù)存入容器中,消費(fèi)者需要的數(shù)據(jù)從容器中獲取,實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者之間的數(shù)據(jù)傳輸解耦
  • 數(shù)據(jù)容器由互斥鎖保護(hù),同一個時(shí)刻只能有一個線程訪問數(shù)據(jù)容器,生產(chǎn)者和消費(fèi)者之間通過條件變量(或信號量)實(shí)現(xiàn)同步
  • 對于數(shù)據(jù)容器的訪問,生產(chǎn)者和消費(fèi)者遵循三個原則:
    • 生產(chǎn)者和生產(chǎn)者之間互斥
    • 消費(fèi)者和消費(fèi)者之間互斥
    • 生產(chǎn)者和消費(fèi)者之間互斥并同步
      線程同步--生產(chǎn)者消費(fèi)者模型,青菜的Linux專欄,linux

生產(chǎn)者消費(fèi)者模型的高效性

  • 由于生產(chǎn)者和消費(fèi)者之間的數(shù)據(jù)傳輸解耦,生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理數(shù)據(jù),而是直接將數(shù)據(jù)存入容器,消費(fèi)者不需要向生產(chǎn)者請求數(shù)據(jù),而是直接從容器里獲取數(shù)據(jù),因此即便在生產(chǎn)者和消費(fèi)者的效率不對等且多變的情況下,多個生產(chǎn)者依然可以高效專一地并發(fā)生產(chǎn)數(shù)據(jù),多個消費(fèi)者依然可以高效專一地并發(fā)處理數(shù)據(jù),使得系統(tǒng)整體的并發(fā)量得到提高
    線程同步--生產(chǎn)者消費(fèi)者模型,青菜的Linux專欄,linux

基于環(huán)形隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型中的數(shù)據(jù)容器

  • 環(huán)形隊(duì)列中,消費(fèi)者訪問隊(duì)列的頭指針進(jìn)行數(shù)據(jù)出隊(duì)操作,生產(chǎn)者訪問隊(duì)列的尾指針進(jìn)行數(shù)據(jù)入隊(duì)操作
  • 兩把互斥鎖分別保證消費(fèi)者和消費(fèi)者之間的互斥以及生產(chǎn)者和生產(chǎn)者之間的互斥,兩個信號量實(shí)現(xiàn)消費(fèi)者和生產(chǎn)者之間的互斥與同步
    線程同步--生產(chǎn)者消費(fèi)者模型,青菜的Linux專欄,linux
    線程同步--生產(chǎn)者消費(fèi)者模型,青菜的Linux專欄,linux

線程同步--生產(chǎn)者消費(fèi)者模型,青菜的Linux專欄,linux

  • 當(dāng)環(huán)形隊(duì)列既不為空也不為滿時(shí),支持一個生產(chǎn)者和一個消費(fèi)者并發(fā)地進(jìn)行數(shù)據(jù)的存取
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>

//環(huán)形隊(duì)列默認(rèn)容量
const static int defaultcap = 5;

template<class T>
class RingQueue{
private:
    std::vector<T> ringqueue_;
    int cap_;          //容器的容量

    int c_step_;       // 消費(fèi)者環(huán)形隊(duì)列指針
    int p_step_;       // 生產(chǎn)者環(huán)形隊(duì)列指針

    sem_t cdata_sem_;  // 消費(fèi)者的數(shù)據(jù)資源
    sem_t pspace_sem_; // 生產(chǎn)者的空間資源

    pthread_mutex_t c_mutex_;   //消費(fèi)者與消費(fèi)者之間的互斥鎖
    pthread_mutex_t p_mutex_;   //生產(chǎn)者與生產(chǎn)者之間的互斥鎖
public:
    RingQueue(int cap = defaultcap)
        :ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
    {
        //初始化生產(chǎn)者和消費(fèi)者的信號量-->消費(fèi)者一開始沒有信號量資源,生產(chǎn)者一開始具有最多的空間資源
        sem_init(&cdata_sem_, 0, 0);
        sem_init(&pspace_sem_, 0, cap);

        pthread_mutex_init(&c_mutex_, nullptr);
        pthread_mutex_init(&p_mutex_, nullptr);
    }
    ~RingQueue()
    {
        sem_destroy(&cdata_sem_);
        sem_destroy(&pspace_sem_);

        pthread_mutex_destroy(&c_mutex_);
        pthread_mutex_destroy(&p_mutex_);
    }

    //信號量的資源狀態(tài)可以區(qū)分隊(duì)列的空和滿

    void Push(const T &in) 
    {
        //生產(chǎn)者等待空間資源
        sem_wait(&pspace_sem_);
        pthread_mutex_lock(&p_mutex_);
        ringqueue_[p_step_] = in;
        p_step_++;
        p_step_ %= cap_;
        pthread_mutex_unlock(&p_mutex_);
        //生產(chǎn)完數(shù)據(jù)后增加消費(fèi)者的信號量資源
        sem_post(&cdata_sem_);
    }
    void Pop(T *out)      
    {
        //消費(fèi)者等待數(shù)據(jù)資源
        sem_wait(&cdata_sem_);
        pthread_mutex_lock(&c_mutex_);
        *out = ringqueue_[c_step_];
        c_step_++;
        c_step_ %= cap_;
        pthread_mutex_unlock(&c_mutex_);
        //消費(fèi)完數(shù)據(jù)后增加生產(chǎn)者的信號量資源
        sem_post(&pspace_sem_);
    }
};

基于生產(chǎn)者消費(fèi)者模型實(shí)現(xiàn)單例線程池

線程同步--生產(chǎn)者消費(fèi)者模型,青菜的Linux專欄,linux

  • 線程池將線程安全的數(shù)據(jù)容器用容器組織起來的線程封裝在一起,系統(tǒng)啟動時(shí)完成線程的創(chuàng)建,系統(tǒng)關(guān)閉時(shí)再銷毀線程,不僅可以有效提高系統(tǒng)的并發(fā)量同時(shí)可以避免頻繁創(chuàng)建和銷毀線程帶來的性能損失,組織起來的線程也更方便進(jìn)行管理和監(jiān)控.
#pragma once
#include <ctime>
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "Mythread.cpp"
#include "sem_cpmodel.cpp"

struct ThreadInfo
{
    pthread_t tid;
    std::string name;
};

//線程池默認(rèn)線程數(shù)
static const int defalutnum = 5;

template <class T>
class ThreadPool
{
private:
    //用來管理線程的容器
    std::vector<ThreadInfo> threads_;
    //線程安全的環(huán)形隊(duì)列
    RingQueue<T> tasks_;
    //懶漢單例模式靜態(tài)指針
    static ThreadPool<T> * TPtr;
    static pthread_mutex_t _Slock;
public:
    std::string GetThreadName(pthread_t tid){
        for(const auto &ti : threads_){
            if(ti.tid == tid) return ti.name;
        }
        return "None";
    }
    static ThreadPool<T> * Getinstance(){
        //多套一層判斷提高并發(fā)效率
        if(TPtr == nullptr){
            //加鎖保護(hù)靜態(tài)指針
            pthread_mutex_lock(&_Slock);
            if(TPtr == nullptr){
                std :: cout << "SingleTon Created...." << std ::endl;
                TPtr = new ThreadPool<T>();
            }
            pthread_mutex_unlock(&_Slock);
        }
        return TPtr;
    }
private:
    ThreadPool(int num = defalutnum) : threads_(num)
    {}
    ThreadPool(ThreadPool<T>&& TP) = delete;
    ThreadPool(const ThreadPool<T>& TP) = delete;
    ThreadPool<T>& operator=(const ThreadPool<T>& TP) = delete;
public:
    //線程的執(zhí)行函數(shù)
    static void *HandlerTask(void *args){
        ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
        std::string name = tp->GetThreadName(pthread_self());
        while (true){
            //線程從環(huán)形隊(duì)列中獲取任務(wù)并執(zhí)行任務(wù)
            T t;
            tp->tasks_.Pop(&t);
            //執(zhí)行任務(wù)代碼段-->根據(jù)業(yè)務(wù)需求編寫
            std::cout << name << " run, "<< "result: " << t.GetResult() << std::endl;
        }
    }

    //創(chuàng)建線程池中的線程
    void Start(){
        int num = threads_.size();
        for (int i = 0; i < num; i++){
            threads_[i].name = "thread-" + std::to_string(i + 1);
            //線程函數(shù)參數(shù)傳遞this指針以訪問成員變量
            pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
        }
    }
    //將任務(wù)存入環(huán)形隊(duì)列中
    void Push(const T &t){
        tasks_.Push(t);
    }
};

//初始化靜態(tài)指針
template <class T>
ThreadPool<T> * ThreadPool<T>::TPtr = nullptr;
//初始化靜態(tài)鎖
template <class T>
pthread_mutex_t ThreadPool<T>::_Slock = PTHREAD_MUTEX_INITIALIZER;
  • 并發(fā)測試:
    線程同步--生產(chǎn)者消費(fèi)者模型,青菜的Linux專欄,linux
    線程同步--生產(chǎn)者消費(fèi)者模型,青菜的Linux專欄,linux

線程同步--生產(chǎn)者消費(fèi)者模型,青菜的Linux專欄,linux文章來源地址http://www.zghlxwxcb.cn/news/detail-804392.html

到了這里,關(guān)于線程同步--生產(chǎn)者消費(fèi)者模型的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包