国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Linux之【多線程】生產(chǎn)者與消費(fèi)者模型&BlockQueue(阻塞隊(duì)列)

這篇具有很好參考價(jià)值的文章主要介紹了Linux之【多線程】生產(chǎn)者與消費(fèi)者模型&BlockQueue(阻塞隊(duì)列)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、了解生產(chǎn)者消費(fèi)者模型

舉個(gè)例子:學(xué)生要買東西,一般情況下都會(huì)直接聯(lián)系廠商,因?yàn)橘I的商品不多,對(duì)于供貨商來說交易成本太高,所以有了交易場(chǎng)所超市這個(gè)媒介的存在。目的就是為了集中需求,分發(fā)產(chǎn)品。
Linux之【多線程】生產(chǎn)者與消費(fèi)者模型&BlockQueue(阻塞隊(duì)列)

消費(fèi)者與生產(chǎn)者之間通過了超市進(jìn)行交易。當(dāng)生產(chǎn)者不需要的時(shí)候,廠商可以繼續(xù)生產(chǎn),當(dāng)廠商不再生產(chǎn)的時(shí)候消費(fèi)者購買商品!

上述生產(chǎn)的過程和消費(fèi)的過程互相影響的程度很低——解耦
臨時(shí)的保存產(chǎn)品的場(chǎng)所——緩沖區(qū)

函數(shù)調(diào)用:main函數(shù)通過用戶輸入生產(chǎn)了數(shù)據(jù),用變量保存了數(shù)據(jù),要調(diào)用的函數(shù)消費(fèi)了數(shù)據(jù),當(dāng)main函數(shù)調(diào)用func函數(shù),main函數(shù)就會(huì)阻塞等待func函數(shù)返回,這種情況稱為強(qiáng)耦合關(guān)系

利用生產(chǎn)者消費(fèi)者模式可以解決強(qiáng)耦合問題,將串行調(diào)用改為并行執(zhí)行,提高執(zhí)行效率,完成邏輯的解耦。
Linux之【多線程】生產(chǎn)者與消費(fèi)者模型&BlockQueue(阻塞隊(duì)列)

二、生產(chǎn)者與消費(fèi)者模型的幾種關(guān)系及特點(diǎn)

對(duì)消費(fèi)者與生產(chǎn)者模型,可以用以下321原則說明

  • 三種關(guān)系:生產(chǎn)者和生產(chǎn)者(互斥),消費(fèi)者和消費(fèi)者(互斥),生產(chǎn)者和消費(fèi)者(互斥&&同步),互斥保證共享資源的安全性,同步是為了提高訪問效率

  • 二種角色:生產(chǎn)者線程,消費(fèi)者線程

  • 一個(gè)交易場(chǎng)所:一段特定結(jié)構(gòu)的緩沖區(qū)

生產(chǎn)消費(fèi)模型的特點(diǎn)

  1. 未來生產(chǎn)線程和消費(fèi)線程進(jìn)行解耦

  2. 支持生產(chǎn)和消費(fèi)的一段時(shí)間的忙閑不均的問題(緩存區(qū)有數(shù)據(jù)有空間)

  3. 生產(chǎn)者專注生產(chǎn),消費(fèi)專注消費(fèi),提高效率

如果緩沖區(qū)滿了,生產(chǎn)者只能進(jìn)行等待,如果超市緩沖區(qū)為空,消費(fèi)者只能進(jìn)行等待。

三、BlockQueue(阻塞隊(duì)列)

3.1 基礎(chǔ)版阻塞隊(duì)列

阻塞隊(duì)列:阻塞隊(duì)列(Blocking Queue)是一種常用于實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型的數(shù)據(jù)結(jié)構(gòu)

阻塞隊(duì)列為空時(shí),從阻塞隊(duì)列中獲取元素的線程將被阻塞,直到阻塞隊(duì)列被放入元素。
阻塞隊(duì)列已滿時(shí),往阻塞隊(duì)列放入元素的線程將被阻塞,直到有元素被取出。
Linux之【多線程】生產(chǎn)者與消費(fèi)者模型&BlockQueue(阻塞隊(duì)列)

單生產(chǎn)單消費(fèi)測(cè)試

//BlockQueue.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
using  namespace std;

const int gmaxcap=5;

template<class T>
class BlockQueue
{  
private:
    std::queue<T> q_;
    int maxcap_;//隊(duì)列容量
    pthread_mutex_t mutex_;
    pthread_cond_t  pcond_;//生產(chǎn)者對(duì)應(yīng)的條件變量
    pthread_cond_t  ccond_;//消費(fèi)者者對(duì)應(yīng)的條件變量
public:
    BlockQueue(const int& maxcap=gmaxcap)
    :maxcap_(maxcap)
    {
        pthread_mutex_init(&mutex_,nullptr);
        pthread_cond_init(&pcond_,nullptr);
        pthread_cond_init(&ccond_,nullptr);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&pcond_);
        pthread_cond_destroy(&ccond_);
    }

    void push(const T &in)//輸入性參數(shù) &
    {
        pthread_mutex_lock(&mutex_);

        //1.判斷
        //細(xì)節(jié)2:充當(dāng)條件變量的語法必須是while,不能是if
        while(is_full())
        {   
            //細(xì)節(jié)1:該函數(shù)會(huì)以原子性的方式將鎖釋放,并將自己掛起
            //被喚醒的時(shí)候會(huì)自動(dòng)獲取傳入的鎖
            pthread_cond_wait(&pcond_,&mutex_);//緩沖區(qū)滿,生產(chǎn)者阻塞等待
        }
        //2.這一步一定沒有滿
        q_.push(in);
        //3.堵塞隊(duì)列一定有數(shù)據(jù)

        //細(xì)節(jié)3:?jiǎn)拘研袨榭梢苑旁诮怄i前也可以放在解鎖后
        pthread_cond_signal(&ccond_);//喚醒消費(fèi)者
        pthread_mutex_unlock(&mutex_);
        //pthread_cond_signal(&ccond_);//喚醒消費(fèi)者

    }
    void pop(T* out)//輸出型參數(shù):*
    {
        pthread_mutex_lock(&mutex_);
        //1.判斷
        while(is_empty())
        {
            pthread_cond_wait(&ccond_,&mutex_);//緩沖區(qū)空,消費(fèi)者阻塞等待
        }
        //2.這一步一定沒有滿
        *out = q_.front();
        q_.pop();

        //3.堵塞隊(duì)列一定沒有滿
        pthread_cond_signal(&pcond_);//喚醒生產(chǎn)者
        pthread_mutex_unlock(&mutex_);
    }
private:
    bool is_empty()
    {
        return q_.empty();
    }    
    bool is_full()
    {
        return q_.size()==maxcap_;
    } 
};
//Main.cc
#include "BlockQueue.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>


void* productor(void* bq_)//生產(chǎn)
{
    BlockQueue<int> * bq=static_cast<BlockQueue<int>*>(bq_);
    while (true)
    {
        int data=rand()%10+1;
        bq->push(data);
        std::cout<<"生產(chǎn)數(shù)據(jù): "<<data<<std::endl;
        
    }
    return nullptr;
}



void* consumer(void* bq_)//消費(fèi)
{
    BlockQueue<int> * bq=static_cast<BlockQueue<int>*>(bq_);
    while (true)
    {
        int data;
        bq->pop(&data);
        std::cout<<"消費(fèi)數(shù)據(jù): "<<data<<std::endl;
        sleep(1);
    }
    return nullptr;
}



int main()
{
    srand((unsigned long)time(nullptr)^getpid());
    BlockQueue<int> * bq=new BlockQueue<int>();

    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&p,nullptr,productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    delete bq;
    return 0;
}

控制生產(chǎn)速度,即每間隔1s生產(chǎn)一次,生產(chǎn)一個(gè)消費(fèi)一個(gè),而且消費(fèi)的都是最新的數(shù)據(jù)
Linux之【多線程】生產(chǎn)者與消費(fèi)者模型&BlockQueue(阻塞隊(duì)列)
控制消費(fèi)速度,即每間隔1s消費(fèi)一次,剛開始生產(chǎn)多個(gè),穩(wěn)定后生產(chǎn)一個(gè)消費(fèi)一個(gè),消費(fèi)的是以前的數(shù)據(jù)
Linux之【多線程】生產(chǎn)者與消費(fèi)者模型&BlockQueue(阻塞隊(duì)列)

以上代碼的三個(gè)細(xì)節(jié)

  • 細(xì)節(jié)一:pthread_cond_wait(&pcond_,&mutex_);第二個(gè)參數(shù)是鎖,該函數(shù)調(diào)用會(huì)以原子性的方式將鎖釋放,并將自己掛起;被喚醒的時(shí)候會(huì)自動(dòng)獲取傳入的鎖
  • 細(xì)節(jié)二:判斷空和滿的時(shí)候要用while,存在多個(gè)生產(chǎn)者因滿掛起后,消費(fèi)者使用一個(gè)后,同時(shí)喚醒所有生產(chǎn)者,導(dǎo)致數(shù)據(jù)多增加;
    pthread_cond_signal偽喚醒:假設(shè)生產(chǎn)者有5個(gè),消費(fèi)者只有一個(gè),消費(fèi)一下數(shù)據(jù),如果是pthread_cond_broadcast(),把5個(gè)線程同時(shí)喚醒,可是只需要生產(chǎn)一個(gè)數(shù)據(jù),而同時(shí)把5個(gè)線程喚醒,是if判斷的時(shí)候就會(huì)多次push溢出問題
    前提條件:每個(gè)生產(chǎn)者進(jìn)入函數(shù),拿到鎖資源后,判斷為假,調(diào)用wait(),釋放鎖資源讓其他線程進(jìn)來,到最后五個(gè)生產(chǎn)者者同時(shí)處于wait狀態(tài),消費(fèi)者將其全部喚醒,5個(gè)生產(chǎn)者挨個(gè)重新獲取鎖資源接著執(zhí)行代碼,就會(huì)導(dǎo)致上述問題
  • 細(xì)節(jié)三:?jiǎn)拘研袨榭梢苑旁诮怄i前也可以放在解鎖后。解鎖前喚醒:?jiǎn)拘阎竽硞€(gè)生產(chǎn)者得到鎖的優(yōu)先級(jí)高,消費(fèi)者釋放,生產(chǎn)者立馬拿到;解鎖后喚醒:隨機(jī)被某個(gè)消費(fèi)者拿走鎖,不影響

3.2 基于任務(wù)版的阻塞隊(duì)列

基于上述代碼,新建一個(gè)Task.hpp,用來給線程派發(fā)任務(wù)執(zhí)行任務(wù)

BlockQueue.hpp如上
/*****************/
#pragma once

#include <iostream>
#include <cstdio>
#include <functional>
class Task
{

public:
    using func_t =std::function<int(int,int,char)>;
    Task(){}
    Task(int x,int y,char op,func_t callback)
        :x_(x),y_(y),op_(op),callback_(callback)
    {

    }
    std::string operator()()
    {
        int result=callback_(x_,y_,op_);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d=%d",x_,op_,y_,result);
        return buffer;
    }

    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d=?",x_,op_,y_);
        return buffer;
    }
private:
    int x_;
    int y_;
    char op_;
    func_t callback_;
};

/**************************/
#include "BlockQueue.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "Task.hpp"

const std::string oper = "+-*/%"; 
int mymath(int x,int y,char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cerr << "div zero error!" << std::endl;
            result = -1;
        }
        else
            result = x / y;
    }
        break;
    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x % y;
    }
        break;
    default:
        break;
    }
    return result;
}

void* productor(void* bq_)//生產(chǎn)
{    
    BlockQueue<Task> * bq=static_cast<BlockQueue<Task>*>(bq_);
    while (true)
    {
        int x=rand()%100+1;
        int y=rand()%10;
        int operCode=rand() % oper.size();

        Task t(x,y,oper[operCode],mymath);
        bq->push(t);
        std::cout<<"生產(chǎn)任務(wù): "<<t.toTaskString()<<std::endl;
        sleep(1);
    }
    return nullptr;
}



void* consumer(void* bq_)//消費(fèi)
{
    BlockQueue<Task> * bq=static_cast<BlockQueue<Task>*>(bq_);
    while (true)
    {
        Task t;
        bq->pop(&t);
        std::cout<<"消費(fèi)任務(wù):"<<t()<<std::endl;
        //sleep(1);
    }
    return nullptr;
}



int main()
{
    srand((unsigned long)time(nullptr)^getpid());
    BlockQueue<Task> * bq=new BlockQueue<Task>();

    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&p,nullptr,productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    delete bq;
    return 0;
}


3.3 進(jìn)階版生產(chǎn)消費(fèi)模型–生產(chǎn)、消費(fèi)、保存

任務(wù)目標(biāo):

  1. 生產(chǎn)者(線程1)生產(chǎn)任務(wù)加入到計(jì)算任務(wù)隊(duì)列中
  2. 消費(fèi)者&生產(chǎn)者(線程2)消費(fèi)計(jì)算隊(duì)列中任務(wù)并將計(jì)算結(jié)果推送到存儲(chǔ)任務(wù)隊(duì)列中
  3. 消費(fèi)者(線程3)消費(fèi)存儲(chǔ)任務(wù)隊(duì)列,將結(jié)果保存到文件中

Linux之【多線程】生產(chǎn)者與消費(fèi)者模型&BlockQueue(阻塞隊(duì)列)

設(shè)計(jì)思路

  1. 生產(chǎn)者productor將計(jì)算任務(wù)CalTask,push到計(jì)算隊(duì)列中

  2. 消費(fèi)者&生產(chǎn)者consumer獲取計(jì)算任務(wù)CalTask,并將計(jì)算任務(wù)結(jié)果結(jié)合Save方法構(gòu)造一個(gè)SaveTask對(duì)象,然后將這個(gè)對(duì)象push到存儲(chǔ)隊(duì)列中

  3. 消費(fèi)者saver拿到存儲(chǔ)任務(wù),通過回調(diào)函數(shù)將數(shù)據(jù)寫進(jìn)文件中

代碼實(shí)現(xiàn)如下:

/*Main.cc*/
#include "BlockQueue.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "Task.hpp"
//定義一個(gè)隊(duì)列保存計(jì)算任務(wù)隊(duì)列和保存任務(wù)隊(duì)列
template<class C,class S>
class TwoBlockQueue
{
public:
    BlockQueue<C> * c_bq;
    BlockQueue<S> * s_bq;
};

void* productor(void* bqs)//生產(chǎn)
{    
    BlockQueue<CalTask> * bq=(static_cast<TwoBlockQueue<CalTask,SaveTask>*>(bqs))->c_bq;
    while (true)
    {
        int x=rand()%100+1;
        int y=rand()%10;
        int operCode=rand() % oper.size();

        CalTask t(x,y,oper[operCode],mymath);
        bq->push(t);
        std::cout<<"productor->生產(chǎn)任務(wù): "<<t.toTaskString()<<std::endl;
        sleep(1);
    }
    return nullptr;
}



void* consumer(void* bqs)//消費(fèi)
{
    //拿到計(jì)算隊(duì)列
    BlockQueue<CalTask> * bq=(static_cast<TwoBlockQueue<CalTask,SaveTask>*>(bqs))->c_bq;
    //拿到保存隊(duì)列
    BlockQueue<SaveTask> * save_bq=(static_cast<TwoBlockQueue<CalTask,SaveTask>*>(bqs))->s_bq;
    while (true)
    {
        //得到任務(wù)并處理
        CalTask t;
        bq->pop(&t);
        string result=t();//

        std::cout<<"consumer->消費(fèi)任務(wù):"<<result<<std::endl;

        //存儲(chǔ)任務(wù)
        SaveTask save(result,Save);
        save_bq->push(save);
        std::cout<<"consumer->推送保存任務(wù)完成..."<<std::endl;

        //sleep(1);
    }
    return nullptr;
}

void* saver(void* bqs)
{
    BlockQueue<SaveTask> * save_bq=(static_cast<TwoBlockQueue<CalTask,SaveTask>*>(bqs))->s_bq;
    while (true)
    {
        SaveTask t;
        save_bq->pop(&t);
        t();
        cout<<"saver->保存任務(wù)完成"<<endl;
    }
    
    return nullptr;
}




int main()
{
    srand((unsigned long)time(nullptr)^getpid());
    TwoBlockQueue<CalTask,SaveTask> bqs;
    bqs.c_bq=new BlockQueue<CalTask>();
    bqs.s_bq=new BlockQueue<SaveTask>();


    pthread_t c,p,s;
    pthread_create(&c,nullptr,consumer,&bqs);
    pthread_create(&p,nullptr,productor,&bqs);
    pthread_create(&s,nullptr,saver,&bqs);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    pthread_join(s,nullptr);
    delete bqs.c_bq;
    delete bqs.s_bq;
    return 0;
}

Task.hpp

#pragma once

#include <iostream>
#include <cstdio>
#include <cstring>
#include <functional>
class CalTask//計(jì)算任務(wù)
{

public:
    using func_t =std::function<int(int,int,char)>;
    CalTask(){}
    CalTask(int x,int y,char op,func_t callback)
        :x_(x),y_(y),op_(op),callback_(callback)
    {

    }
    std::string operator()()
    {
        int result=callback_(x_,y_,op_);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d=%d",x_,op_,y_,result);
        return buffer;
    }

    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d=?",x_,op_,y_);
        return buffer;
    }
private:
    int x_;
    int y_;
    char op_;
    func_t callback_;
};
const std::string oper = "+-*/%"; 
int mymath(int x,int y,char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cerr << "div zero error!" << std::endl;
            result = -1;
        }
        else
            result = x / y;
    }
        break;
    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x % y;
    }
        break;
    default:
        break;
    }
    return result;
}


class SaveTask
{
    typedef std::function<void(const std::string&)> func_t;
public:
    SaveTask(){}
    SaveTask(const std::string& message,func_t func)
    :message_(message),func_(func)
    {

    }
    void operator()()
    {
        func_(message_);
    }

private:
    std::string message_;
    func_t func_;
};
void Save(const std::string& message)
{
    const std::string target="./log.txt";
    FILE* fp=fopen(target.c_str(),"a+");
    if(!fp)
    {
        std::cerr<<"fopen error"<<endl;
        return;
    }
    fputs(message.c_str(),fp);
    fputs("\n",fp);
    fclose(fp);
}

Linux之【多線程】生產(chǎn)者與消費(fèi)者模型&BlockQueue(阻塞隊(duì)列)

四、小結(jié)

阻塞隊(duì)列也適用于多生產(chǎn)者多消費(fèi)
在阻塞隊(duì)列中,無論外部線程再多,真正進(jìn)入到阻塞隊(duì)列里生產(chǎn)或消費(fèi)的線程永遠(yuǎn)只有一個(gè)。
在一個(gè)任務(wù)隊(duì)列中,有多個(gè)生產(chǎn)者與多個(gè)消費(fèi)者,由于有鎖的存在,所以任意時(shí)刻只有一個(gè)執(zhí)行流在阻塞隊(duì)列里放或者取。

生產(chǎn)消費(fèi)模型高效體現(xiàn)在哪里

高效并不是體現(xiàn)在從隊(duì)列中消費(fèi)數(shù)據(jù)高效!

而是我們可以讓一個(gè)、多個(gè)線程并發(fā)的同時(shí)計(jì)算多個(gè)任務(wù)!在計(jì)算多個(gè)任務(wù)的同時(shí),并不影響其他線程繼續(xù)從隊(duì)列里拿任務(wù)的過程。

也就是說,生產(chǎn)者消費(fèi)者模型的高效:可以在生產(chǎn)之前與消費(fèi)之后讓線程并行執(zhí)行

生產(chǎn)任務(wù)需要花費(fèi)時(shí)間,不是把任務(wù)放進(jìn)隊(duì)列就完事了;消費(fèi)任務(wù)也是需要時(shí)間的,不是把任務(wù)從隊(duì)列中拿出來就完事了,還要處理它,處理它期間不影響其它線程消費(fèi),反之亦然,這才是生產(chǎn)者與消費(fèi)者模型的高效體現(xiàn)!?。?span toymoban-style="hidden">文章來源地址http://www.zghlxwxcb.cn/news/detail-431270.html

到了這里,關(guān)于Linux之【多線程】生產(chǎn)者與消費(fèi)者模型&BlockQueue(阻塞隊(duì)列)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包