国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Liunx下的消費者與生產(chǎn)者模型與簡單線程池的實現(xiàn)

這篇具有很好參考價值的文章主要介紹了Liunx下的消費者與生產(chǎn)者模型與簡單線程池的實現(xiàn)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

前言

本文主要會結(jié)束消費者生產(chǎn)者模型,以及簡單線程池的實現(xiàn)。

1.消費者與生產(chǎn)者模型

之前我們學了條件變量和互斥等概念。條件變量本質(zhì)就是一個隊列,它會將因為某種條件不滿足不能往后執(zhí)行的線程添加到這個隊列中,避免線程做無用功,當條件滿足時,會將隊列中的線程重新喚醒繼續(xù)執(zhí)行。我們接下將利用條件變量和互斥鎖實現(xiàn)一個消費者與生產(chǎn)者模型。

消費者與生產(chǎn)者模型是什么呢?

在生活中,工廠將加工好的產(chǎn)品運到超市,超市將商品賣給消費者。這就是一個生產(chǎn)者消費者模型,這樣做的好處是消費者能方便買到商品,工廠也能將產(chǎn)品快速輸出沒有積壓貨物。這是一種高效的處理方式。在計算機中,也有這種消費者生產(chǎn)者模型,有些線負責從某種渠道拿到數(shù)據(jù),這就是生產(chǎn)者;有些線程處理其他線程手中拿到的數(shù)據(jù),這就是消費者。我們可以定義一個緩沖區(qū),相當于超市。一邊讓消費者線程處理數(shù)據(jù),一邊讓生產(chǎn)者線程產(chǎn)生數(shù)據(jù)。這就是消費者與生產(chǎn)者模型。

消費者與生產(chǎn)者模型的高效性體現(xiàn)在哪里呢?

我們?yōu)樯兑x一個緩沖區(qū)這么麻煩呢,這個緩沖區(qū)是怎么提高效率的呢?其實很簡單,負責拿到數(shù)據(jù)的線程,其實在拿到數(shù)據(jù)的時候需要處理時間,如果沒有緩沖區(qū),這個時候負責處理數(shù)據(jù)的線程就會陷入等待狀態(tài),執(zhí)行效率就大幅度降低,如果有了這種緩沖區(qū),那么久可以保證消費者線程時時刻刻都會有數(shù)據(jù)處理。這樣就提高了效率。

消費者生產(chǎn)者模型的注意事項

這個緩沖區(qū)需要被消費者和生產(chǎn)者看到,這就意味著這個緩沖區(qū)就是一個公共資源,也就是臨界資源。既然是多線程訪問這個臨界資源,這里就涉及到多線程的互斥與同步。怎么維護多線程之間的互斥與同步呢?我們分析一下消費者與生產(chǎn)者之間的關(guān)系。

首先,生產(chǎn)者與生產(chǎn)者之間的關(guān)系:互斥關(guān)系;畢竟緩沖區(qū)只有一個,只能允許一個線程去訪問這個資源。消費者與消費者的關(guān)系同理也是互斥。消費者與生產(chǎn)者的關(guān)系:互斥和同步。因為不管是消費者還是生產(chǎn)者,只能其中一個去訪問臨界資源,這就是互斥。只有生產(chǎn)者生產(chǎn)了數(shù)據(jù),消費者才能從緩沖區(qū)里獲得數(shù)據(jù)消費,這就是同步關(guān)系。簡單總結(jié)一下消費者和生產(chǎn)者模型就是3 2 1原則,即3種關(guān)系,2種角色,1種場所。

我們搞明白了上述的道理之后,就可以使用條件變量和互斥鎖簡單實現(xiàn)一下這個消費者于生產(chǎn)者模型了。我們可以用隊列來模擬這個緩沖區(qū),在定義兩個函數(shù),一個處理數(shù)據(jù)一個生產(chǎn)數(shù)據(jù),充當消費者和生產(chǎn)者。

BlockQueue.hpp

#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
const int gacp=5;
template<class T>
class BlockQueue
{ 
  public:
    BlockQueue(const int cap=gacp )
    :_cap(gacp)
    {
         pthread_mutex_init(&_mutex,nullptr);
         pthread_cond_init(&_consumerCond,nullptr);
         pthread_cond_init(&_productorCond,nullptr);
    }
   ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_consumerCond);
        pthread_cond_destroy(&_productorCond);
    }
    void push(const T& in)
    {   
         pthread_mutex_lock(&_mutex);
         while(isFull())
         {
            pthread_cond_wait(&_productorCond,&_mutex);
         }
         _q.push(in);
         //if(_q.size()>=_cap/2)
         
         pthread_cond_signal(&_consumerCond);//喚醒消費者
         
         pthread_mutex_unlock(&_mutex);
    }
   bool isFull()
   {
     return _q.size()==_cap;
   }
   bool isEmpty()
   {
     return _q.empty();
   }
    void pop(T*out)
    {
        pthread_mutex_lock(&_mutex);
        while(isEmpty())
        {
            pthread_cond_wait(&_consumerCond,&_mutex);
        }
        *out=_q.front();
        _q.pop();
        
        pthread_cond_signal(&_productorCond);//喚醒生產(chǎn)者
        pthread_mutex_unlock(&_mutex);
    }
  
  
private:
  std::queue<T>_q;
  int _cap;
  pthread_mutex_t _mutex;
  pthread_cond_t _consumerCond;//消費者對應條件變量
  pthread_cond_t _productorCond;//生產(chǎn)者對應的條件變量
  


};

用C++queue容器來封裝這個阻塞隊列作為緩沖區(qū),push行為就是生產(chǎn)者的行為,pop行為就是消費者行為。我們使用條件變量,當隊列為滿時就不讓生產(chǎn)者生產(chǎn)了,這個時候讓生產(chǎn)行為處于等待狀態(tài),喚醒消費行為。當隊列為空時就不讓消費者消費了,這個時候讓消費行為處于等待狀態(tài),喚醒生產(chǎn)行為。

Task.hpp

#include<iostream>
#include<string>
class Task
{
  public:
  Task()
  {

  }
  Task(int x,int y,char op):_x(x),_y(y),_op(op){}
  ~Task(){}
  void operator()()
  {
    switch(_op)
    {
        case '+':
        _ret=_x+_y;
        break;

        case '-':
        _ret=_x-_y;
        break;
       
        case '*':
        _ret=_x*_y;
        break;
        
        case '/':
        { 
            if(_y==0)
            {
                _exitCode=-1;
            }
            else
           _ret=_x/_y;
        }
        break;
        default:
        break;
    }
  }
  std::string formatRet()
  {
     return std::to_string(_ret)+" "+'('+std::to_string(_exitCode)+')';
  }
  std::string formatArg()
  {
    return std::to_string(_x)+" " +_op+" "+std::to_string(_y)+'=';
  }
    private:
    int _x;
    int _y;
    char _op;
    int _ret;
    int _exitCode;


};

這個是封裝了一個任務類,模擬要處理的數(shù)據(jù)。這個任務類會產(chǎn)生一些需要計算的結(jié)果。將這些表達式和結(jié)果作為數(shù)據(jù)傳入阻塞隊列中由生產(chǎn)者和消費者進行處理。

test.cc

#include"BlockQueue.hpp"
#include"Task.hpp"
#include<pthread.h>
 #include<ctime>
#include<unistd.h>
 void *consumer(void*arg)
 {   
    sleep(1);
    BlockQueue<Task>*bq=static_cast< BlockQueue<Task>*>(arg);
    while(1)
    { 
        Task t;
        //1.從blockqueue中獲取數(shù)據(jù)
        bq->pop(&t);
        t();
       //2.結(jié)合某種業(yè)務邏輯,處理數(shù)據(jù)
    std::cout<<"consumer data:"<<t.formatArg()<<t.formatRet()<<std::endl;

    }
 }
 void *producter(void*arg)
 {  
    BlockQueue<Task>*bq=static_cast< BlockQueue<Task>*>(arg);
    std::string opers ="+-*/";
    while(1)
    {  
        //1.從某種渠道獲取數(shù)據(jù)
        int x=rand()%10+1;
        int y=rand()%10+1;
        char op=opers[rand()%opers.size()];
        //2.將數(shù)據(jù)加入blockqueue中,完成生產(chǎn)過程
        Task t(x,y,op);
        bq->push(t);
        std::cout<<"prducter data:"<<t.formatArg()<<"=?"<<std::endl;
    }
 }
 int main()
 {
    //多產(chǎn)生單消費
    srand((uint64_t)time(nullptr));
    BlockQueue<Task>*bq=new BlockQueue<Task>();
    pthread_t  c[2],p[3];
    pthread_create(&c[0],nullptr,consumer,bq);
    pthread_create(&c[1],nullptr,consumer,bq);
    pthread_create(&p[0],nullptr,consumer,bq);
    pthread_create(&p[1],nullptr,producter,bq);
    pthread_create(&p[2],nullptr,producter,bq);
    
    pthread_join(c[0],nullptr);
    pthread_join(c[1],nullptr);
    pthread_join(p[0],nullptr);
    pthread_join(p[1],nullptr);
    pthread_join(p[2],nullptr);
  

    return 0;
 }

這里就是創(chuàng)建線程模擬生產(chǎn)者和生產(chǎn)者在緩沖區(qū)中處理數(shù)據(jù)。

Liunx下的消費者與生產(chǎn)者模型與簡單線程池的實現(xiàn),Liunx操作系統(tǒng),Liunx,學習,線程,操作系統(tǒng),線程池

以上就是生產(chǎn)者于消費者模型了,用多線程去模擬消費者和生產(chǎn)者,因為這里只有一份臨界資源就是阻塞隊列,我們已經(jīng)對這個阻塞隊列進行加鎖保護了,同時又維護好了線程之間的同步互斥關(guān)系。所以一批線程是沒問題的。

2.信號量

信號量之前就提到過,是用來描述臨界資源數(shù)數(shù)目,它的工作機制和買票看電影類似,是一種資源預訂機制。它本質(zhì)就是一個計數(shù)器,P操作就是減減,V操作就是加加,PV操作本身是原子的,信號量申請成功表示資源可用,否則表示資源不可用。使用信號量就是相當于把對資源的判斷轉(zhuǎn)化成對信號量的申請行為。之前的互斥鎖就可以看做是二元信號量,由1到0,再由0到1。信號量也是要被其他線程或者進程所看見的,本質(zhì)上也是一種臨界資源,所以在申請信號量和釋放信號量的時候也需要加鎖保護。

1.信號量的接口

初始化信號量的函數(shù)sem_init

int sem_init(sem_t *sem, int pshared, unsigned int value);

參數(shù)說明:sem:需要初始化的信號量。pshared:傳入0值表示線程間共享,傳入非零值表示進程間共享。value:信號量的初始值(計數(shù)器的初始值)。

銷毀信號量的函數(shù)sem_destroy

int sem_destroy(sem_t *sem);

參數(shù)說明:sem:需要銷毀的信號量。

等待信號量的函數(shù)sem_wait(相當于P操作

int sem_wait(sem_t *sem);

sem:需要等待的信號量。

發(fā)布信號量(釋放信號量)函數(shù)sem_pos(相當于V操作

int sem_post(sem_t *sem);

sem:需要發(fā)布的信號量。

2.使用環(huán)形隊列模擬生產(chǎn)者消費者模型

這個信號量是對臨界資源數(shù)目的描述,也就是說在某些情況下臨界資源是可以拆成一份份的來訪問。我們使用環(huán)形隊列模擬生產(chǎn)者消費者模型時,就可以使用信號量。循環(huán)隊列每個位置可以看成一份臨界資源,為保證消費者和生產(chǎn)者能安全高效的生產(chǎn)的數(shù)據(jù),我們規(guī)定生產(chǎn)者先生產(chǎn)者數(shù)據(jù),消費者再消費數(shù)據(jù),同事生產(chǎn)者不能套圈生產(chǎn)數(shù)據(jù),不然之前生產(chǎn)的數(shù)據(jù)可能還沒有被消費就會被新數(shù)據(jù)覆蓋,造成數(shù)據(jù)丟失。消費者也不能追上生產(chǎn)者,因為它沒有數(shù)據(jù)需要消費還可能導致一些異常情況出現(xiàn)。

Liunx下的消費者與生產(chǎn)者模型與簡單線程池的實現(xiàn),Liunx操作系統(tǒng),Liunx,學習,線程,操作系統(tǒng),線程池

生產(chǎn)消費關(guān)系分析

1.生產(chǎn)者和消費者關(guān)系的資源不一樣,生產(chǎn)者關(guān)心空間,消費者關(guān)心數(shù)據(jù)。2.只要信號量不為0,表示資源可用,線程可訪問。3.環(huán)形隊列我們只要訪問不同的區(qū)域,生產(chǎn)行為和消費行為是可以同時進行的。4.當隊列為空的時生產(chǎn)者先行,當隊列為滿的時候消費者先行。生產(chǎn)者不能套圈消費者,消費者不能超過生產(chǎn)者。

Liunx下的消費者與生產(chǎn)者模型與簡單線程池的實現(xiàn),Liunx操作系統(tǒng),Liunx,學習,線程,操作系統(tǒng),線程池

當我們將資源整體使用的時候就優(yōu)先考慮互斥鎖,當資源可以分成多份使用的時候就優(yōu)先考慮信號量。

RingQueue.h

#pragma once

#include<iostream>
#include<vector>
#include <semaphore.h>

static const int N=5;

template<class T>
class RingQueue
{
private:
void P(sem_t& s)
{
   sem_wait(&s);                                                                                                                
}
void V(sem_t& s)
{
     sem_post(&s);
}
public:
void push(const T&in)
{
    //生產(chǎn)
    P(_space_sem);
    //一定有對應的空間資源,不用判斷 
    _ring[_p_step++]=in;
    _p_step%=_cap;
    V(_data_sem);

     

}
void pop(T*out)
{
   //消費
   P(_data_sem);
   *out=_ring[_c_step++];
   _c_step%=_cap;
   V(_space_sem);



}
RingQueue(int num=N):_ring(num),_cap(num)
{
      sem_init(&_data_sem,0,0);
      sem_init(&_space_sem,0,num);
      _c_step=_p_step=0;
}
~RingQueue()
{
       sem_destroy(&_space_sem);
       sem_destroy(&_data_sem);
}


private:
   std::vector<int>_ring; 
   int _cap;//環(huán)形隊列容量
   sem_t _data_sem ;//數(shù)據(jù)信號量,消費者關(guān)心
   sem_t _space_sem ;//空間信號量,生產(chǎn)者關(guān)心
   int _c_step;//消費位置
   int _p_step;//生產(chǎn)位置
};

test.cc

#include<pthread.h>
#include<memory>
#include<unistd.h>
#include<sys/types.h>
#include"RingQueue.hpp"
using namespace std;

void *consumer(void *arg)
{
  RingQueue<int>*rq=static_cast<RingQueue<int>*>(arg);
  while(1)
  {
    int data=0;
    rq->pop(&data);
    cout<<"consumer done: "<<data<<endl;

  }
}
void *productor(void *arg)
{
 RingQueue<int>*rq=static_cast<RingQueue<int>*>(arg);
  while(1)
  {
    int data=rand()%10+1;
    rq->push(data);
    cout<<"productor done: "<<data<<endl;
    sleep(1);
  }
}
int main()
{   
    srand(time(nullptr));

    RingQueue<int>*rq=new RingQueue<int>();
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,rq);
    pthread_create(&p,nullptr,productor,rq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
}

Liunx下的消費者與生產(chǎn)者模型與簡單線程池的實現(xiàn),Liunx操作系統(tǒng),Liunx,學習,線程,操作系統(tǒng),線程池

因為這里是單生產(chǎn)單消費者,所以對信號量的訪問都沒有做加鎖保護,我們可以將其改造成多生產(chǎn)者多消費者模型。

Task.hpp

#include <iostream>
#include <string>
#include <unistd.h>

class Task
{
public:
    Task()
    {
    }
    Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
    {
    }
    void operator()()
    {
        switch (_op)
        {
        case '+':
            _result = _x + _y;
            break;
        case '-':
            _result = _x - _y;
            break;
        case '*':
            _result = _x * _y;
            break;
        case '/':
        {
            if (_y == 0)
                _exitCode = -1;
            else
                _result = _x / _y;
        }
        break;
        case '%':
        {
            if (_y == 0)
                _exitCode = -2;
            else
                _result = _x % _y;
        }
        break;
        default:
            break;
        }
       //模擬處理過程所花費的時間
        usleep(10000);
    }
    std::string formatArg()
    {
        return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
    }
    std::string formatRes()
    {
        return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
    }
    ~Task()
    {
    }

private:
    int _x;
    int _y;
    char _op;

    int _result;
    int _exitCode;
};

將之前的模擬計算任務的頭文件拿過來,充當數(shù)據(jù)分配給線程處理。

RingQueue

#pragma once

#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>

static const int N = 5;

template <class T>
class RingQueue
{
private:
    void P(sem_t &s)
    {
        sem_wait(&s);
    }
    void V(sem_t &s)
    {
        sem_post(&s);
    }
    void Lock(pthread_mutex_t &m)
    {
        pthread_mutex_lock(&m);
    }
    void Unlock(pthread_mutex_t &m)
    {
        pthread_mutex_unlock(&m);
    }

public:
    RingQueue(int num = N) : _ring(num), _cap(num)
    {
        sem_init(&_data_sem, 0, 0);
        sem_init(&_space_sem, 0, num);
        _c_step = _p_step = 0;

        pthread_mutex_init(&_c_mutex, nullptr);
        pthread_mutex_init(&_p_mutex, nullptr);
    }
    // 生產(chǎn)
    void push(const T &in)
    {  
        //先申請信號量,進行資源分配,再加鎖效率高
        
        P(_space_sem);  
        Lock(_p_mutex); 
        _ring[_p_step++] = in;
        _p_step %= _cap;
        Unlock(_p_mutex);
        V(_data_sem);
    }
    // 消費
    void pop(T *out)
    {  //先申請信號量,進行資源分配,再加鎖效率高
        P(_data_sem);
        Lock(_c_mutex); 
        *out = _ring[_c_step++];
        _c_step %= _cap;
        Unlock(_c_mutex);
        V(_space_sem);
    }
    ~RingQueue()
    {
        sem_destroy(&_data_sem);
        sem_destroy(&_space_sem);

        pthread_mutex_destroy(&_c_mutex);
        pthread_mutex_destroy(&_p_mutex);
    }

private:
    std::vector<T> _ring;
    int _cap;         // 環(huán)形隊列容器大小
    sem_t _data_sem;  // 消費者關(guān)心 數(shù)據(jù)信號量
    sem_t _space_sem; // 生產(chǎn)者關(guān)心 空間信號量
    int _c_step;      // 消費位置
    int _p_step;      // 生產(chǎn)位置

    pthread_mutex_t _c_mutex;//消費者鎖
    pthread_mutex_t _p_mutex;//生產(chǎn)者鎖
};

這里是多線程并發(fā)訪問就需要加鎖處理,消費者與生產(chǎn)者之間的關(guān)系可以通過信號量來維護,消費者與消費者,生產(chǎn)者與生產(chǎn)者的關(guān)系是互斥的,需要加鎖來訪問資源確保任意一個資源只能由一個線程訪問,申請資源進行資源的分配,所以這里就定義了兩把鎖,分別作用與生產(chǎn)者和消費者。

Main.cc

#include "RingQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <pthread.h>
#include <memory>
#include <sys/types.h>
#include <unistd.h>
#include <cstring>

using namespace std;

const char *ops = "+-*/%";

void *consumerRoutine(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        Task t;
        rq->pop(&t);
        t();
        cout << "consumer done, 處理完成的任務是: " << t.formatRes() << endl;
    }
}

void *productorRoutine(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        int x = rand() % 100;
        int y = rand() % 100;
        char op = ops[(x + y) % strlen(ops)];
        Task t(x, y, op);
        rq->push(t);
        cout << "productor done, 生產(chǎn)的任務是: " << t.formatArg() << endl;
    }
}

int main()
{
    srand(time(nullptr));
    RingQueue<Task> *rq = new RingQueue<Task>();
    
    pthread_t c[3], p[2];
    for (int i = 0; i < 3; i++)
        {
            pthread_create(c + i, nullptr, consumerRoutine, rq);
        }
    for (int i = 0; i < 2; i++)
        { 
         pthread_create(p + i, nullptr, productorRoutine, rq);
        }

    for (int i = 0; i < 3; i++)
        {
            pthread_join(c[i], nullptr);
        }
    for (int i = 0; i < 2; i++)
        {
            pthread_join(p[i], nullptr);
        }

    delete rq;
    return 0;
}

Liunx下的消費者與生產(chǎn)者模型與簡單線程池的實現(xiàn),Liunx操作系統(tǒng),Liunx,學習,線程,操作系統(tǒng),線程池

簡單總結(jié)一下:當我們將臨界資源整體使用后,優(yōu)先考慮互斥鎖,當臨界資源可以被拆分使用的時候就要考慮信使用號量。我們在信號量之后加鎖比較合適,這樣提前將資源分配好,這樣鎖申請成功之后就能直接使用臨界資源了,一定程度上提高了效率。同時消費者和生產(chǎn)者還有一個高效點,就是在處理的數(shù)據(jù)的過程中是沒有枷鎖的嗎,比如消費者函數(shù)在處理計算表達式的時候就沒有加鎖的,只有訪問隊列的時候才觸及加鎖,同樣的生產(chǎn)者函數(shù),產(chǎn)生計算表達式的式的時候也沒有加鎖。這才是高效的地方。

3.簡單實現(xiàn)線程池

什么是線程池:一種線程使用模式。線程過多會帶來調(diào)度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務。這避免了在處理短時間任務時創(chuàng)建與銷毀線程的代價。線程池不僅能夠保證內(nèi)核的充分利用,還能防止過分調(diào)度??捎镁€程數(shù)量應該取決于可用的并發(fā)處理器、處理器內(nèi)核、內(nèi)存、網(wǎng)絡sockets等的數(shù)量。簡單來數(shù)線程池就是提前構(gòu)建一批線程處理任務,這樣免去了構(gòu)建線程的開銷,有任務推送即可馬上處理。下面我們簡單模擬一下線程池的實現(xiàn)。

我們可以用vector容器作為存分線程的載體,在vector中存放一批線程,同時將需要需要處理的任務放置在任務隊列中,我們可以將先將推送的任務添加至任務隊列中,再將任務隊列中線程取出分配給線程。這樣就能維護一個線程池了。同時如果任務隊列中無任務了,就需要將線程池中處理任務的邏輯給休眠,等到有任務時在將其換新進行任務的處理。實際上,這個線程池就相當于消費者,緩沖區(qū)是封裝在線程池中。這也是消費者與生產(chǎn)者模型。生產(chǎn)者我們可以直接自定義的導入任務放入線程池中。

ThreaPool.hpp

#pragma once

#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <unistd.h>
#include "Thread.hpp"
#include "Task.hpp"
#include "lockGuard.hpp"

const static int N = 5;

template <class T>
class ThreadPool
{
public:
    ThreadPool(int num = N) : _num(num)
    {
        pthread_mutex_init(&_lock, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }
    pthread_mutex_t* getlock()
    {
        return &_lock;
    }
    void threadWait()
    {
        pthread_cond_wait(&_cond, &_lock);
    }
    void threadWakeup()
    {
        pthread_cond_signal(&_cond);
    }
    bool isEmpty()
    {
        return _tasks.empty();
    }
    T popTask()
    {
        T t = _tasks.front();
        _tasks.pop();
        return t;
    }
    static void threadRoutine(void *args)
    {
        ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
        while (true)
        {
            // 1. 檢測有沒有任務
            // 2. 有:處理
            // 3. 無:等待
            //必定加鎖
            T t;
            {
                LockGuard lockguard(tp->getlock());
                while (tp->isEmpty())
                {
                    tp->threadWait();
                }
                t = tp->popTask(); // 從公共區(qū)域拿到私有區(qū)域
            }
           
            t();
            std::cout << "thread handler done, result: " << t.formatRes() << std::endl;
            
        }
    }
    void init()
    {
        for (int i = 0; i < _num; i++)
        {
            _threads.push_back(Thread(i, threadRoutine, this));
        }
    }
    void start()
    {
        for (auto &t : _threads)
        {
            t.run();
        }
    }
    void check()
    {
        for (auto &t : _threads)
        {
            std::cout << t.threadname() << " running..." << std::endl;
        }
    }
    void pushTask(const T &t)
    {
        LockGuard lockgrard(&_lock);
        _tasks.push(t);
        threadWakeup();
    }
    ~ThreadPool()
    {
        for (auto &t : _threads)
        {
            t.join();
        }
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_cond);
    }

private:
    std::vector<Thread> _threads;//線程池
    int _num;

    std::queue<T> _tasks; //任務隊列

    pthread_mutex_t _lock;
    pthread_cond_t _cond;
};

接下來就是對一些的簡單封裝了,比如簡單封裝一下互斥鎖和線程以及任務。

lockGuard.hpp

#pragma once
#include <iostream>
#include <pthread.h>

class Mutex // 自己不維護鎖,有外部傳入
{
public:
    Mutex(pthread_mutex_t *mutex):_pmutex(mutex)
    {}
    void lock()
    {
        pthread_mutex_lock(_pmutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(_pmutex);
    }
    ~Mutex()
    {}
private:
    pthread_mutex_t *_pmutex;
};

class LockGuard // 自己不維護鎖,有外部傳入
{
public:
    LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
    {
        _mutex.lock();
    }
    ~LockGuard()
    {
        _mutex.unlock();
    }
private:
    Mutex _mutex;
};

Task

#include <iostream>
#include <string>
#include <unistd.h>

class Task
{
public:
    Task()
    {
    }
    Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
    {
    }
    void operator()()
    {
        switch (_op)
        {
        case '+':
            _result = _x + _y;
            break;
        case '-':
            _result = _x - _y;
            break;
        case '*':
            _result = _x * _y;
            break;
        case '/':
        {
            if (_y == 0)
                _exitCode = -1;
            else
                _result = _x / _y;
        }
        break;
        case '%':
        {
            if (_y == 0)
                _exitCode = -2;
            else
                _result = _x % _y;
        }
        break;
        default:
            break;
        }

        usleep(100000);
    }
    std::string formatArg()
    {
        return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
    }
    std::string formatRes()
    {
        return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
    }
    ~Task()
    {
    }

private:
    int _x;
    int _y;
    char _op;

    int _result;
    int _exitCode;
};

thread.hpp

#pragma once

#include <iostream>
#include <string>
#include <cstdlib>
#include <pthread.h>

class Thread
{
public:
    typedef enum
    {
        NEW = 0,
        RUNNING,
        EXITED
    } ThreadStatus;
    typedef void (*func_t)(void *);

public:
    Thread(int num, func_t func, void *args) : _tid(0), _status(NEW), _func(func), _args(args)
    {
        char name[128];
        snprintf(name, sizeof(name), "thread-%d", num);
        _name = name;
    }
    int status() { return _status; }
    std::string threadname() { return _name; }
    pthread_t threadid()
    {
        if (_status == RUNNING)
            return _tid;
        else
        {
            return 0;
        }
    }
  
    static void *runHelper(void *args)
    {
        Thread *ts = (Thread*)args; // 就拿到了當前對象
        (*ts)();
        return nullptr;
    }
    void operator ()() //仿函數(shù)
    {
        if(_func != nullptr) _func(_args);
    }
    void run()
    {
        int n = pthread_create(&_tid, nullptr, runHelper, this);
        if(n != 0) exit(1);
        _status = RUNNING;
    }
    void join()
    {
        int n = pthread_join(_tid, nullptr);
        if( n != 0)
        {
            std::cerr << "main thread join thread " << _name << " error" << std::endl;
            return;
        }
        _status = EXITED;
    }
    ~Thread()
    {
    }

private:
    pthread_t _tid;
    std::string _name;
    func_t _func; // 線程未來要執(zhí)行的回調(diào)
    void *_args;
    ThreadStatus _status;
};

接著就是調(diào)用邏輯了。

#include"ThreadPool.hpp"
#include<memory>

int main()
{
    std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
    tp->init();
    tp->start();
    while(1)
    {
     int x, y;
        char op;
        std::cout << "please Enter x> ";
        std::cin >> x;
        std::cout << "please Enter y> ";
        std::cin >> y;
        std::cout << "please Enter op(+-*/%)> ";
        std::cin >> op;

        Task t(x, y, op);
        tp->pushTask(t);

        // 充當生產(chǎn)者, 從讀取數(shù)據(jù),構(gòu)建成為任務,推送給線程池
         sleep(1);
    
    }
}

Liunx下的消費者與生產(chǎn)者模型與簡單線程池的實現(xiàn),Liunx操作系統(tǒng),Liunx,學習,線程,操作系統(tǒng),線程池

以上就是對線程池的簡單實現(xiàn),這將之前寫過的東西都給串起來了。

4.補充說明

STL中的容器是否是線程安全的?不是.
原因是, STL 的設計初衷是將性能挖掘到極致, 而一旦涉及到加鎖保證線程安全, 會對性能造成巨大的影響.而且對于不同的容器, 加鎖方式的不同, 性能可能也不同(例如hash表的鎖表和鎖桶).因此 STL 默認不是線程安全. 如果需要在多線程環(huán)境下使用, 往往需要調(diào)用者自行保證線程安全.

其他常見的幾種鎖

悲觀鎖:在每次取數(shù)據(jù)時,總是擔心數(shù)據(jù)會被其他線程修改,所以會在取數(shù)據(jù)前先加鎖(讀鎖,寫鎖等),當其他線程想要訪問數(shù)據(jù)時,被阻塞掛起。
樂觀鎖:每次取數(shù)據(jù)時候,總是樂觀的認為數(shù)據(jù)不會被其他線程修改,因此不上鎖。但是在更新數(shù)據(jù)前,會判斷其他數(shù)據(jù)在更新前有沒有對數(shù)據(jù)進行修改。主要采用兩種方式:版本號機制和CAS操作。
CAS操作:當需要更新數(shù)據(jù)時,判斷當前內(nèi)存值和之前取得的值是否相等。如果相等則用新值更新。若不等則失敗,失敗則重試,一般是一個自旋的過程,即不斷重試。

以上內(nèi)容如有問題,歡迎指正!文章來源地址http://www.zghlxwxcb.cn/news/detail-545588.html

到了這里,關(guān)于Liunx下的消費者與生產(chǎn)者模型與簡單線程池的實現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務器費用

相關(guān)文章

  • 多線程之生產(chǎn)者消費者

    目的是回顧多線程的幾個api 多生產(chǎn)者+多消費者+共享池

    2024年02月07日
    瀏覽(27)
  • Linux——生產(chǎn)者消費者模型

    Linux——生產(chǎn)者消費者模型

    目錄 一.為何要使用生產(chǎn)者消費者模型 ?二.生產(chǎn)者消費者模型優(yōu)點 ?三.基于BlockingQueue的生產(chǎn)者消費者模型 1.BlockingQueue——阻塞隊列 2.實現(xiàn)代碼 ?四.POSIX信號量 五.基于環(huán)形隊列的生產(chǎn)消費模型 生產(chǎn)者消費者模式就是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者

    2024年02月08日
    瀏覽(22)
  • kafka生產(chǎn)者消費者練習

    需求:寫一個生產(chǎn)者,不斷的去生產(chǎn)用戶行為數(shù)據(jù),寫入到kafka的一個topic中 生產(chǎn)的數(shù)據(jù)格式: 造數(shù)據(jù) {“guid”:1,“eventId”:“pageview”,“timestamp”:1637868346789} isNew = 1 {“guid”:1,“eventId”:“addcard”,“timestamp”:1637868347625} isNew = 0 {“guid”:2,“eventId”:“collect”,“timestamp”

    2024年02月08日
    瀏覽(28)
  • linux:生產(chǎn)者消費者模型

    linux:生產(chǎn)者消費者模型

    個人主頁 : 個人主頁 個人專欄 : 《數(shù)據(jù)結(jié)構(gòu)》 《C語言》《C++》《Linux》 本文是對于生產(chǎn)者消費者模型的知識總結(jié) 生產(chǎn)者消費者模型就是通過一個容器來解決生產(chǎn)者消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直接通訊,而是通過之間的容器來進行通訊,所以生產(chǎn)者

    2024年04月15日
    瀏覽(19)
  • 線程同步--生產(chǎn)者消費者模型

    線程同步--生產(chǎn)者消費者模型

    條件變量是 線程間共享的全局變量 ,線程間可以通過條件變量進行同步控制 條件變量的使用必須依賴于互斥鎖以確保線程安全,線程申請了互斥鎖后,可以調(diào)用特定函數(shù) 進入條件變量等待隊列(同時釋放互斥鎖) ,其他線程則可以通過條件變量在特定的條件下喚醒該線程( 喚醒后線

    2024年01月19日
    瀏覽(25)
  • 【JavaEE】生產(chǎn)者消費者模式

    【JavaEE】生產(chǎn)者消費者模式

    作者主頁: paper jie_博客 本文作者:大家好,我是paper jie,感謝你閱讀本文,歡迎一建三連哦。 本文于《JavaEE》專欄,本專欄是針對于大學生,編程小白精心打造的。筆者用重金(時間和精力)打造,將基礎知識一網(wǎng)打盡,希望可以幫到讀者們哦。 其他專欄:《MySQL》《C語言》

    2024年02月05日
    瀏覽(18)
  • rabbitmq消費者與生產(chǎn)者

    rabbitmq消費者與生產(chǎn)者

    在第一次學習rabbitmq的時候,遇到了許多不懂得 第一步導包 第二步新增生產(chǎn)者 在這里中: connectionFactory.setVirtualHost(\\\"my_vhost\\\");//填寫自己的隊列名稱,如果你的為”/“則填寫\\\'\\\'/\\\'\\\' 第三步新增消費者 消息獲取成功 注意如果你用的云服務器需要打開這兩個端口 5672 15672 如果你使

    2024年02月11日
    瀏覽(26)
  • LabVIEW建立生產(chǎn)者消費者

    LabVIEW建立生產(chǎn)者消費者

    LabVIEW建立生產(chǎn)者消費者 生產(chǎn)者/消費者設計模式由并行循環(huán)組成,這些循環(huán)分為兩類:生產(chǎn)者循環(huán)和消費者循環(huán)。生產(chǎn)者循環(huán)和消費者循環(huán)間的通信可以使用隊列或通道連線來實現(xiàn)。 隊列 LabVIEW內(nèi)置的隊列操作VI可在函數(shù)選板數(shù)據(jù)通信隊列操作(?Functions?Data?Communication??Que

    2024年02月07日
    瀏覽(19)
  • 【設計模式】生產(chǎn)者消費者模型

    【設計模式】生產(chǎn)者消費者模型

    帶你輕松理解生產(chǎn)者消費者模型!生產(chǎn)者消費者模型可以說是同步與互斥最典型的應用場景了!文末附有模型簡單實現(xiàn)的代碼,若有疑問可私信一起討論。 生產(chǎn)者消費者模式就是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直接通訊,而通過

    2023年04月17日
    瀏覽(28)
  • python rocketmq生產(chǎn)者消費者

    安裝依賴包 生產(chǎn)者 需要注意的是假如你用的java SDK 需要只是UNinname 我們可以看到下列代碼設置了tag以及key,在頁面可以根據(jù)key查找消息 消費方式PullConsumer(全部消費)(可重復消費) 消費方式PushConsumer(即時消費)(不可重復消費) 生產(chǎn)者發(fā)送消息選擇隊列,以及設置順

    2024年02月14日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包