前言
本文主要會結(jié)束消費者生產(chǎn)者模型,以及簡單線程池的實現(xiàn)。
1.消費者與生產(chǎn)者模型
之前我們學了條件變量和互斥等概念。條件變量本質(zhì)就是一個隊列,它會將因為某種條件不滿足不能往后執(zhí)行的線程添加到這個隊列中,避免線程做無用功,當條件滿足時,會將隊列中的線程重新喚醒繼續(xù)執(zhí)行。我們接下將利用條件變量和互斥鎖實現(xiàn)一個消費者與生產(chǎn)者模型。
消費者與生產(chǎn)者模型是什么呢?
在生活中,工廠將加工好的產(chǎn)品運到超市,超市將商品賣給消費者。這就是一個生產(chǎn)者消費者模型,這樣做的好處是消費者能方便買到商品,工廠也能將產(chǎn)品快速輸出沒有積壓貨物。這是一種高效的處理方式。在計算機中,也有這種消費者生產(chǎn)者模型,有些線負責從某種渠道拿到數(shù)據(jù),這就是生產(chǎn)者;有些線程處理其他線程手中拿到的數(shù)據(jù),這就是消費者。我們可以定義一個緩沖區(qū),相當于超市。一邊讓消費者線程處理數(shù)據(jù),一邊讓生產(chǎn)者線程產(chǎn)生數(shù)據(jù)。這就是消費者與生產(chǎn)者模型。
消費者與生產(chǎn)者模型的高效性體現(xiàn)在哪里呢?
我們?yōu)樯兑x一個緩沖區(qū)這么麻煩呢,這個緩沖區(qū)是怎么提高效率的呢?其實很簡單,負責拿到數(shù)據(jù)的線程,其實在拿到數(shù)據(jù)的時候需要處理時間,如果沒有緩沖區(qū),這個時候負責處理數(shù)據(jù)的線程就會陷入等待狀態(tài),執(zhí)行效率就大幅度降低,如果有了這種緩沖區(qū),那么久可以保證消費者線程時時刻刻都會有數(shù)據(jù)處理。這樣就提高了效率。
消費者生產(chǎn)者模型的注意事項
這個緩沖區(qū)需要被消費者和生產(chǎn)者看到,這就意味著這個緩沖區(qū)就是一個公共資源,也就是臨界資源。既然是多線程訪問這個臨界資源,這里就涉及到多線程的互斥與同步。
怎么維護多線程之間的互斥與同步呢?
我們分析一下消費者與生產(chǎn)者之間的關(guān)系。
首先,生產(chǎn)者與生產(chǎn)者之間的關(guān)系:互斥關(guān)系;畢竟緩沖區(qū)只有一個,只能允許一個線程去訪問這個資源。消費者與消費者的關(guān)系同理也是互斥。消費者與生產(chǎn)者的關(guān)系:互斥和同步。因為不管是消費者還是生產(chǎn)者,只能其中一個去訪問臨界資源,這就是互斥。只有生產(chǎn)者生產(chǎn)了數(shù)據(jù),消費者才能從緩沖區(qū)里獲得數(shù)據(jù)消費,這就是同步關(guān)系。
簡單總結(jié)一下消費者和生產(chǎn)者模型就是3 2 1原則,即3種關(guān)系,2種角色,1種場所。
我們搞明白了上述的道理之后,就可以使用條件變量和互斥鎖簡單實現(xiàn)一下這個消費者于生產(chǎn)者模型了。我們可以用隊列來模擬這個緩沖區(qū),在定義兩個函數(shù),一個處理數(shù)據(jù)一個生產(chǎn)數(shù)據(jù),充當消費者和生產(chǎn)者。
BlockQueue.hpp
#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
const int gacp=5;
template<class T>
class BlockQueue
{
public:
BlockQueue(const int cap=gacp )
:_cap(gacp)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_consumerCond,nullptr);
pthread_cond_init(&_productorCond,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumerCond);
pthread_cond_destroy(&_productorCond);
}
void push(const T& in)
{
pthread_mutex_lock(&_mutex);
while(isFull())
{
pthread_cond_wait(&_productorCond,&_mutex);
}
_q.push(in);
//if(_q.size()>=_cap/2)
pthread_cond_signal(&_consumerCond);//喚醒消費者
pthread_mutex_unlock(&_mutex);
}
bool isFull()
{
return _q.size()==_cap;
}
bool isEmpty()
{
return _q.empty();
}
void pop(T*out)
{
pthread_mutex_lock(&_mutex);
while(isEmpty())
{
pthread_cond_wait(&_consumerCond,&_mutex);
}
*out=_q.front();
_q.pop();
pthread_cond_signal(&_productorCond);//喚醒生產(chǎn)者
pthread_mutex_unlock(&_mutex);
}
private:
std::queue<T>_q;
int _cap;
pthread_mutex_t _mutex;
pthread_cond_t _consumerCond;//消費者對應條件變量
pthread_cond_t _productorCond;//生產(chǎn)者對應的條件變量
};
用C++queue容器來封裝這個阻塞隊列作為緩沖區(qū),push行為就是生產(chǎn)者的行為,pop行為就是消費者行為。我們使用條件變量,當隊列為滿時就不讓生產(chǎn)者生產(chǎn)了,這個時候讓生產(chǎn)行為處于等待狀態(tài),喚醒消費行為。當隊列為空時就不讓消費者消費了,這個時候讓消費行為處于等待狀態(tài),喚醒生產(chǎn)行為。
Task.hpp
#include<iostream>
#include<string>
class Task
{
public:
Task()
{
}
Task(int x,int y,char op):_x(x),_y(y),_op(op){}
~Task(){}
void operator()()
{
switch(_op)
{
case '+':
_ret=_x+_y;
break;
case '-':
_ret=_x-_y;
break;
case '*':
_ret=_x*_y;
break;
case '/':
{
if(_y==0)
{
_exitCode=-1;
}
else
_ret=_x/_y;
}
break;
default:
break;
}
}
std::string formatRet()
{
return std::to_string(_ret)+" "+'('+std::to_string(_exitCode)+')';
}
std::string formatArg()
{
return std::to_string(_x)+" " +_op+" "+std::to_string(_y)+'=';
}
private:
int _x;
int _y;
char _op;
int _ret;
int _exitCode;
};
這個是封裝了一個任務類,模擬要處理的數(shù)據(jù)。這個任務類會產(chǎn)生一些需要計算的結(jié)果。將這些表達式和結(jié)果作為數(shù)據(jù)傳入阻塞隊列中由生產(chǎn)者和消費者進行處理。
test.cc
#include"BlockQueue.hpp"
#include"Task.hpp"
#include<pthread.h>
#include<ctime>
#include<unistd.h>
void *consumer(void*arg)
{
sleep(1);
BlockQueue<Task>*bq=static_cast< BlockQueue<Task>*>(arg);
while(1)
{
Task t;
//1.從blockqueue中獲取數(shù)據(jù)
bq->pop(&t);
t();
//2.結(jié)合某種業(yè)務邏輯,處理數(shù)據(jù)
std::cout<<"consumer data:"<<t.formatArg()<<t.formatRet()<<std::endl;
}
}
void *producter(void*arg)
{
BlockQueue<Task>*bq=static_cast< BlockQueue<Task>*>(arg);
std::string opers ="+-*/";
while(1)
{
//1.從某種渠道獲取數(shù)據(jù)
int x=rand()%10+1;
int y=rand()%10+1;
char op=opers[rand()%opers.size()];
//2.將數(shù)據(jù)加入blockqueue中,完成生產(chǎn)過程
Task t(x,y,op);
bq->push(t);
std::cout<<"prducter data:"<<t.formatArg()<<"=?"<<std::endl;
}
}
int main()
{
//多產(chǎn)生單消費
srand((uint64_t)time(nullptr));
BlockQueue<Task>*bq=new BlockQueue<Task>();
pthread_t c[2],p[3];
pthread_create(&c[0],nullptr,consumer,bq);
pthread_create(&c[1],nullptr,consumer,bq);
pthread_create(&p[0],nullptr,consumer,bq);
pthread_create(&p[1],nullptr,producter,bq);
pthread_create(&p[2],nullptr,producter,bq);
pthread_join(c[0],nullptr);
pthread_join(c[1],nullptr);
pthread_join(p[0],nullptr);
pthread_join(p[1],nullptr);
pthread_join(p[2],nullptr);
return 0;
}
這里就是創(chuàng)建線程模擬生產(chǎn)者和生產(chǎn)者在緩沖區(qū)中處理數(shù)據(jù)。
以上就是生產(chǎn)者于消費者模型了,用多線程去模擬消費者和生產(chǎn)者,因為這里只有一份臨界資源就是阻塞隊列,我們已經(jīng)對這個阻塞隊列進行加鎖保護了,同時又維護好了線程之間的同步互斥關(guān)系。所以一批線程是沒問題的。
2.信號量
信號量之前就提到過,是用來描述臨界資源數(shù)數(shù)目,它的工作機制和買票看電影類似,是
一種資源預訂機制。
它本質(zhì)就是一個計數(shù)器,P操作就是減減,V操作就是加加,PV操作本身是原子的,信號量申請成功表示資源可用,否則表示資源不可用。使用信號量就是相當于把對資源的判斷轉(zhuǎn)化成對信號量的申請行為。之前的互斥鎖就可以看做是二元信號量,由1到0,再由0到1。信號量也是要被其他線程或者進程所看見的,本質(zhì)上也是一種臨界資源,所以在申請信號量和釋放信號量的時候也需要加鎖保護。
1.信號量的接口
初始化信號量的函數(shù)sem_init
int sem_init(sem_t *sem, int pshared, unsigned int value);
參數(shù)說明:sem:需要初始化的信號量。pshared:傳入0值表示線程間共享,傳入非零值表示進程間共享。value:信號量的初始值(計數(shù)器的初始值)。
銷毀信號量的函數(shù)sem_destroy
int sem_destroy(sem_t *sem);
參數(shù)說明:sem:需要銷毀的信號量。
等待信號量的函數(shù)sem_wait(相當于P操作
)
int sem_wait(sem_t *sem);
sem:需要等待的信號量。
發(fā)布信號量(釋放信號量)函數(shù)sem_pos(相當于V操作
)
int sem_post(sem_t *sem);
sem:需要發(fā)布的信號量。
2.使用環(huán)形隊列模擬生產(chǎn)者消費者模型
這個信號量是對臨界資源數(shù)目的描述,也就是說在某些情況下臨界資源是可以拆成一份份的來訪問。我們使用環(huán)形隊列模擬生產(chǎn)者消費者模型時,就可以使用信號量。循環(huán)隊列每個位置可以看成一份臨界資源,為保證消費者和生產(chǎn)者能安全高效的生產(chǎn)的數(shù)據(jù),我們規(guī)定生產(chǎn)者先生產(chǎn)者數(shù)據(jù),消費者再消費數(shù)據(jù),同事生產(chǎn)者不能套圈生產(chǎn)數(shù)據(jù),不然之前生產(chǎn)的數(shù)據(jù)可能還沒有被消費就會被新數(shù)據(jù)覆蓋,造成數(shù)據(jù)丟失。消費者也不能追上生產(chǎn)者,因為它沒有數(shù)據(jù)需要消費還可能導致一些異常情況出現(xiàn)。
生產(chǎn)消費關(guān)系分析
1.生產(chǎn)者和消費者關(guān)系的資源不一樣,生產(chǎn)者關(guān)心空間,消費者關(guān)心數(shù)據(jù)。2.只要信號量不為0,表示資源可用,線程可訪問。3.環(huán)形隊列我們只要訪問不同的區(qū)域,生產(chǎn)行為和消費行為是可以同時進行的。4.當隊列為空的時生產(chǎn)者先行,當隊列為滿的時候消費者先行。生產(chǎn)者不能套圈消費者,消費者不能超過生產(chǎn)者。
當我們將資源整體使用的時候就優(yōu)先考慮互斥鎖,當資源可以分成多份使用的時候就優(yōu)先考慮信號量。
RingQueue.h
#pragma once
#include<iostream>
#include<vector>
#include <semaphore.h>
static const int N=5;
template<class T>
class RingQueue
{
private:
void P(sem_t& s)
{
sem_wait(&s);
}
void V(sem_t& s)
{
sem_post(&s);
}
public:
void push(const T&in)
{
//生產(chǎn)
P(_space_sem);
//一定有對應的空間資源,不用判斷
_ring[_p_step++]=in;
_p_step%=_cap;
V(_data_sem);
}
void pop(T*out)
{
//消費
P(_data_sem);
*out=_ring[_c_step++];
_c_step%=_cap;
V(_space_sem);
}
RingQueue(int num=N):_ring(num),_cap(num)
{
sem_init(&_data_sem,0,0);
sem_init(&_space_sem,0,num);
_c_step=_p_step=0;
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
}
private:
std::vector<int>_ring;
int _cap;//環(huán)形隊列容量
sem_t _data_sem ;//數(shù)據(jù)信號量,消費者關(guān)心
sem_t _space_sem ;//空間信號量,生產(chǎn)者關(guān)心
int _c_step;//消費位置
int _p_step;//生產(chǎn)位置
};
test.cc
#include<pthread.h>
#include<memory>
#include<unistd.h>
#include<sys/types.h>
#include"RingQueue.hpp"
using namespace std;
void *consumer(void *arg)
{
RingQueue<int>*rq=static_cast<RingQueue<int>*>(arg);
while(1)
{
int data=0;
rq->pop(&data);
cout<<"consumer done: "<<data<<endl;
}
}
void *productor(void *arg)
{
RingQueue<int>*rq=static_cast<RingQueue<int>*>(arg);
while(1)
{
int data=rand()%10+1;
rq->push(data);
cout<<"productor done: "<<data<<endl;
sleep(1);
}
}
int main()
{
srand(time(nullptr));
RingQueue<int>*rq=new RingQueue<int>();
pthread_t c,p;
pthread_create(&c,nullptr,consumer,rq);
pthread_create(&p,nullptr,productor,rq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
}
因為這里是單生產(chǎn)單消費者,所以對信號量的訪問都沒有做加鎖保護,我們可以將其改造成多生產(chǎn)者多消費者模型。
Task.hpp
#include <iostream>
#include <string>
#include <unistd.h>
class Task
{
public:
Task()
{
}
Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
{
}
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
//模擬處理過程所花費的時間
usleep(10000);
}
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
將之前的模擬計算任務的頭文件拿過來,充當數(shù)據(jù)分配給線程處理。
RingQueue
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
static const int N = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
void Lock(pthread_mutex_t &m)
{
pthread_mutex_lock(&m);
}
void Unlock(pthread_mutex_t &m)
{
pthread_mutex_unlock(&m);
}
public:
RingQueue(int num = N) : _ring(num), _cap(num)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, num);
_c_step = _p_step = 0;
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
// 生產(chǎn)
void push(const T &in)
{
//先申請信號量,進行資源分配,再加鎖效率高
P(_space_sem);
Lock(_p_mutex);
_ring[_p_step++] = in;
_p_step %= _cap;
Unlock(_p_mutex);
V(_data_sem);
}
// 消費
void pop(T *out)
{ //先申請信號量,進行資源分配,再加鎖效率高
P(_data_sem);
Lock(_c_mutex);
*out = _ring[_c_step++];
_c_step %= _cap;
Unlock(_c_mutex);
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
std::vector<T> _ring;
int _cap; // 環(huán)形隊列容器大小
sem_t _data_sem; // 消費者關(guān)心 數(shù)據(jù)信號量
sem_t _space_sem; // 生產(chǎn)者關(guān)心 空間信號量
int _c_step; // 消費位置
int _p_step; // 生產(chǎn)位置
pthread_mutex_t _c_mutex;//消費者鎖
pthread_mutex_t _p_mutex;//生產(chǎn)者鎖
};
這里是多線程并發(fā)訪問就需要加鎖處理,消費者與生產(chǎn)者之間的關(guān)系可以通過信號量來維護,消費者與消費者,生產(chǎn)者與生產(chǎn)者的關(guān)系是互斥的,需要加鎖來訪問資源確保任意一個資源只能由一個線程訪問,申請資源進行資源的分配,所以這里就定義了兩把鎖,分別作用與生產(chǎn)者和消費者。
Main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <pthread.h>
#include <memory>
#include <sys/types.h>
#include <unistd.h>
#include <cstring>
using namespace std;
const char *ops = "+-*/%";
void *consumerRoutine(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
Task t;
rq->pop(&t);
t();
cout << "consumer done, 處理完成的任務是: " << t.formatRes() << endl;
}
}
void *productorRoutine(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
int x = rand() % 100;
int y = rand() % 100;
char op = ops[(x + y) % strlen(ops)];
Task t(x, y, op);
rq->push(t);
cout << "productor done, 生產(chǎn)的任務是: " << t.formatArg() << endl;
}
}
int main()
{
srand(time(nullptr));
RingQueue<Task> *rq = new RingQueue<Task>();
pthread_t c[3], p[2];
for (int i = 0; i < 3; i++)
{
pthread_create(c + i, nullptr, consumerRoutine, rq);
}
for (int i = 0; i < 2; i++)
{
pthread_create(p + i, nullptr, productorRoutine, rq);
}
for (int i = 0; i < 3; i++)
{
pthread_join(c[i], nullptr);
}
for (int i = 0; i < 2; i++)
{
pthread_join(p[i], nullptr);
}
delete rq;
return 0;
}
簡單總結(jié)一下:
當我們將臨界資源整體使用后,優(yōu)先考慮互斥鎖,當臨界資源可以被拆分使用的時候就要考慮信使用號量。我們在信號量之后加鎖比較合適,這樣提前將資源分配好,這樣鎖申請成功之后就能直接使用臨界資源了,一定程度上提高了效率。
同時消費者和生產(chǎn)者還有一個高效點,就是在處理的數(shù)據(jù)的過程中是沒有枷鎖的嗎,比如消費者函數(shù)在處理計算表達式的時候就沒有加鎖的,只有訪問隊列的時候才觸及加鎖,同樣的生產(chǎn)者函數(shù),產(chǎn)生計算表達式的式的時候也沒有加鎖。這才是高效的地方。
3.簡單實現(xiàn)線程池
什么是線程池:一種線程使用模式。線程過多會帶來調(diào)度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務。這避免了在處理短時間任務時創(chuàng)建與銷毀線程的代價。線程池不僅能夠保證內(nèi)核的充分利用,還能防止過分調(diào)度??捎镁€程數(shù)量應該取決于可用的并發(fā)處理器、處理器內(nèi)核、內(nèi)存、網(wǎng)絡sockets等的數(shù)量。
簡單來數(shù)線程池就是提前構(gòu)建一批線程處理任務,這樣免去了構(gòu)建線程的開銷,有任務推送即可馬上處理。
下面我們簡單模擬一下線程池的實現(xiàn)。
我們可以用vector容器作為存分線程的載體,在vector中存放一批線程,同時將需要需要處理的任務放置在任務隊列中,我們可以將先將推送的任務添加至任務隊列中,再將任務隊列中線程取出分配給線程。這樣就能維護一個線程池了。同時如果任務隊列中無任務了,就需要將線程池中處理任務的邏輯給休眠,等到有任務時在將其換新進行任務的處理。實際上,這個線程池就相當于消費者,緩沖區(qū)是封裝在線程池中。這也是消費者與生產(chǎn)者模型。生產(chǎn)者我們可以直接自定義的導入任務放入線程池中。
ThreaPool.hpp
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <unistd.h>
#include "Thread.hpp"
#include "Task.hpp"
#include "lockGuard.hpp"
const static int N = 5;
template <class T>
class ThreadPool
{
public:
ThreadPool(int num = N) : _num(num)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
}
pthread_mutex_t* getlock()
{
return &_lock;
}
void threadWait()
{
pthread_cond_wait(&_cond, &_lock);
}
void threadWakeup()
{
pthread_cond_signal(&_cond);
}
bool isEmpty()
{
return _tasks.empty();
}
T popTask()
{
T t = _tasks.front();
_tasks.pop();
return t;
}
static void threadRoutine(void *args)
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
while (true)
{
// 1. 檢測有沒有任務
// 2. 有:處理
// 3. 無:等待
//必定加鎖
T t;
{
LockGuard lockguard(tp->getlock());
while (tp->isEmpty())
{
tp->threadWait();
}
t = tp->popTask(); // 從公共區(qū)域拿到私有區(qū)域
}
t();
std::cout << "thread handler done, result: " << t.formatRes() << std::endl;
}
}
void init()
{
for (int i = 0; i < _num; i++)
{
_threads.push_back(Thread(i, threadRoutine, this));
}
}
void start()
{
for (auto &t : _threads)
{
t.run();
}
}
void check()
{
for (auto &t : _threads)
{
std::cout << t.threadname() << " running..." << std::endl;
}
}
void pushTask(const T &t)
{
LockGuard lockgrard(&_lock);
_tasks.push(t);
threadWakeup();
}
~ThreadPool()
{
for (auto &t : _threads)
{
t.join();
}
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
private:
std::vector<Thread> _threads;//線程池
int _num;
std::queue<T> _tasks; //任務隊列
pthread_mutex_t _lock;
pthread_cond_t _cond;
};
接下來就是對一些的簡單封裝了,比如簡單封裝一下互斥鎖和線程以及任務。
lockGuard.hpp
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex // 自己不維護鎖,有外部傳入
{
public:
Mutex(pthread_mutex_t *mutex):_pmutex(mutex)
{}
void lock()
{
pthread_mutex_lock(_pmutex);
}
void unlock()
{
pthread_mutex_unlock(_pmutex);
}
~Mutex()
{}
private:
pthread_mutex_t *_pmutex;
};
class LockGuard // 自己不維護鎖,有外部傳入
{
public:
LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
{
_mutex.lock();
}
~LockGuard()
{
_mutex.unlock();
}
private:
Mutex _mutex;
};
Task
#include <iostream>
#include <string>
#include <unistd.h>
class Task
{
public:
Task()
{
}
Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
{
}
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
usleep(100000);
}
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include <pthread.h>
class Thread
{
public:
typedef enum
{
NEW = 0,
RUNNING,
EXITED
} ThreadStatus;
typedef void (*func_t)(void *);
public:
Thread(int num, func_t func, void *args) : _tid(0), _status(NEW), _func(func), _args(args)
{
char name[128];
snprintf(name, sizeof(name), "thread-%d", num);
_name = name;
}
int status() { return _status; }
std::string threadname() { return _name; }
pthread_t threadid()
{
if (_status == RUNNING)
return _tid;
else
{
return 0;
}
}
static void *runHelper(void *args)
{
Thread *ts = (Thread*)args; // 就拿到了當前對象
(*ts)();
return nullptr;
}
void operator ()() //仿函數(shù)
{
if(_func != nullptr) _func(_args);
}
void run()
{
int n = pthread_create(&_tid, nullptr, runHelper, this);
if(n != 0) exit(1);
_status = RUNNING;
}
void join()
{
int n = pthread_join(_tid, nullptr);
if( n != 0)
{
std::cerr << "main thread join thread " << _name << " error" << std::endl;
return;
}
_status = EXITED;
}
~Thread()
{
}
private:
pthread_t _tid;
std::string _name;
func_t _func; // 線程未來要執(zhí)行的回調(diào)
void *_args;
ThreadStatus _status;
};
接著就是調(diào)用邏輯了。
#include"ThreadPool.hpp"
#include<memory>
int main()
{
std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
tp->init();
tp->start();
while(1)
{
int x, y;
char op;
std::cout << "please Enter x> ";
std::cin >> x;
std::cout << "please Enter y> ";
std::cin >> y;
std::cout << "please Enter op(+-*/%)> ";
std::cin >> op;
Task t(x, y, op);
tp->pushTask(t);
// 充當生產(chǎn)者, 從讀取數(shù)據(jù),構(gòu)建成為任務,推送給線程池
sleep(1);
}
}
以上就是對線程池的簡單實現(xiàn),這將之前寫過的東西都給串起來了。
4.補充說明
STL中的容器是否是線程安全的?不是.
原因是, STL 的設計初衷是將性能挖掘到極致, 而一旦涉及到加鎖保證線程安全, 會對性能造成巨大的影響.而且對于不同的容器, 加鎖方式的不同, 性能可能也不同(例如hash表的鎖表和鎖桶).因此 STL 默認不是線程安全. 如果需要在多線程環(huán)境下使用, 往往需要調(diào)用者自行保證線程安全.
其他常見的幾種鎖
悲觀鎖:在每次取數(shù)據(jù)時,總是擔心數(shù)據(jù)會被其他線程修改,所以會在取數(shù)據(jù)前先加鎖(讀鎖,寫鎖等),當其他線程想要訪問數(shù)據(jù)時,被阻塞掛起。
樂觀鎖:每次取數(shù)據(jù)時候,總是樂觀的認為數(shù)據(jù)不會被其他線程修改,因此不上鎖。但是在更新數(shù)據(jù)前,會判斷其他數(shù)據(jù)在更新前有沒有對數(shù)據(jù)進行修改。主要采用兩種方式:版本號機制和CAS操作。
CAS操作:當需要更新數(shù)據(jù)時,判斷當前內(nèi)存值和之前取得的值是否相等。如果相等則用新值更新。若不等則失敗,失敗則重試,一般是一個自旋的過程,即不斷重試。文章來源:http://www.zghlxwxcb.cn/news/detail-545588.html
以上內(nèi)容如有問題,歡迎指正!文章來源地址http://www.zghlxwxcb.cn/news/detail-545588.html
到了這里,關(guān)于Liunx下的消費者與生產(chǎn)者模型與簡單線程池的實現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!