一、線程互斥
1.相關(guān)概念
1.臨界資源:多線程執(zhí)行流共享的資源,且一次只能允許一個執(zhí)行流訪問的資源就叫做臨界資源。(多線程、多進(jìn)程打印數(shù)據(jù))
2.臨界區(qū):每個線程內(nèi)部,訪問臨界資源的代碼,就叫做臨界區(qū)。
3.互斥:任何時刻,互斥保證有且只有一個執(zhí)行流進(jìn)入臨界區(qū),訪問臨界資源,通常對臨界資源起保護(hù)作用。
4.原子性:不會被任何調(diào)度機制打斷的操作,該操作只有兩態(tài),要么完成,要么不執(zhí)行 。
實現(xiàn)一個小實例
#include<iostream> #include<unistd.h> #include<stdio.h> using namespace std; int ticket=10000; void* threadRoutinue(void* args) { const char* name=static_cast<const char*>(args); while(true) { if(ticket>0) { usleep(1000);//模擬搶票花費時間 cout<<name<<"get a ticket: "<<ticket--<<endl; } else{ break; } } return nullptr; } int main() { //創(chuàng)建線程模擬搶票 pthread_t tid[4]; int n=sizeof(tid)/sizeof(tid[0]); for(int i=0;i<n;i++) { char buffer[64]; snprintf(buffer,sizeof(buffer),"thread_%d",i); pthread_create(tid+i,nullptr,threadRoutinue,buffer); } for(int i=0;i<n;i++) { pthread_join(tid[i],nullptr); } return 0; }
從程序中可以看到,票數(shù)到0的時候就沒有票了,線程就應(yīng)該退出了。
但是結(jié)果中,票數(shù)甚至被搶到了負(fù)數(shù),這是怎么回事。
這里提一個問題,這里對票(臨界資源)的訪問是原子的嗎?(是安全的嗎?) 答案肯定不是?。?/strong>
可能在一個線程A中,剛剛將tickets加載到內(nèi)存上,線程A就被切走了,這時線程A的數(shù)據(jù)和上下文被保存,線程A從CPU上被剝離。
線程B開始搶票,如果他的競爭力非常強,一次運行后搶到了1000張票。
線程B執(zhí)行完后線程A又來了,他會從上次執(zhí)行的地方繼續(xù)執(zhí)行,但是他上次保存的tickets的數(shù)據(jù)是10000,所以搶到了一張票后,將剩余的9999張票寫回內(nèi)存,本來線程B執(zhí)行完后還剩9000張票,但是線程A執(zhí)行完后剩余的票數(shù)反而增多了。
2.互斥鎖(mutex)
對于上面的搶票程序,要想使每個線程正確的搶票就要保證:當(dāng)一個線程在進(jìn)入到搶票環(huán)節(jié)時,其他線程不能進(jìn)行搶票。
所以就可以對搶票環(huán)節(jié)加互斥鎖。
pthread_mutex_init、pthread_mutex_destroy:對線程鎖進(jìn)行初始化和銷毀
#include <pthread.h> // pthread_mutex_t mutex: 鎖變量,所有線程都可看到 int pthread_mutex_destroy(pthread_mutex_t *mutex);// 銷毀鎖 int pthread_mutex_init(pthread_mutex_t *restrict mutex,constpthread_mutexattr_t *restrict attr);// 初始化鎖 // attr: 鎖屬性,我們傳入空指針就可 // 如果將鎖定義為靜態(tài)或者全局的,可以使用宏直接初始化,且不用銷毀 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock、int pthread_mutex_unlock:對線程進(jìn)行加鎖和解鎖
#include <pthread.h> int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);
對搶票小demo進(jìn)行加鎖
#include<iostream> #include<unistd.h> #include<stdio.h> using namespace std; int ticket=10000; //臨界資源 pthread_mutex_t mutex; void* threadRoutinue(void* args) { const char* name=static_cast<const char*>(args); while(true) { pthread_mutex_lock(&mutex); if(ticket>0) { usleep(1000);//模擬搶票花費時間 cout<<name<<" get a ticket: "<<ticket--<<endl; pthread_mutex_unlock(&mutex); } else{ cout<<name<<"票搶完了"<<endl; pthread_mutex_unlock(&mutex); break; } usleep(1000); } return nullptr; } int main() { pthread_mutex_init(&mutex,nullptr); //創(chuàng)建線程模擬搶票 pthread_t tid[4]; int n=sizeof(tid)/sizeof(tid[0]); for(int i=0;i<n;i++) { char buffer[64]; snprintf(buffer,sizeof(buffer),"thread_%d",i); pthread_create(tid+i,nullptr,threadRoutinue,buffer); } for(int i=0;i<n;i++) { pthread_join(tid[i],nullptr); } pthread_mutex_destroy(&mutex); return 0; }
多線程臨界資源原子
細(xì)節(jié):
1.凡是訪問同一個臨界資源的線程,都要進(jìn)行加鎖保護(hù),而且必須加同一把鎖,這是一個規(guī)則,不能有例外
2.每一個線程訪問臨界資源之前,得加鎖,加鎖本質(zhì)是給 臨界區(qū)加鎖,加鎖的粒度盡量細(xì)一些。
3.線程訪問臨界區(qū)的時候,需要先加鎖 -> 所以線程都必須看到同一把鎖 -> 鎖本身就是公共資源 -> 鎖如何保證自己的安全? -> 加鎖和解鎖本身就是原子的。
4.臨界區(qū)可以是一行代碼,可以是一批代碼,a.線程可能被切換? 當(dāng)然可能 b.切換會有影響嘛? 沒有,因為一個線程申請一個鎖以后,該線程被臨時切換,其他任何線程沒有辦法進(jìn)入臨界區(qū),無法申請到鎖,所以無法訪問到臨界資源。
5.這也正是體現(xiàn)互斥帶來的串行化的表現(xiàn),站在其他線程的角度,對其他線程有意義的狀態(tài)是:鎖被申請(持有鎖),鎖被釋放(不持有鎖),原子性。
3.互斥鎖的原理
以搶票程序為例,當(dāng)線程需要訪問臨界資源時,需要先訪問mtx,為了所有的線程都能看到它,所以鎖肯定是全局的。
且鎖本身也是臨界資源。那么如何保證鎖本身是安全的,即獲取鎖的過程是安全的。
其原理是:加鎖(lock)、解鎖(unlock)的過程是原子的!
那怎樣才算是原子的呢:一行代碼被翻譯成匯編后只有一條匯編,就是原子的。
為了實現(xiàn)互斥鎖操作,大多數(shù)體系結(jié)構(gòu)都提供了swap或exchange指令。
該指令的作用是把寄存器和內(nèi)存單元的數(shù)據(jù)相交換,由于只有一條指令,保證了原子性。
即使是多處理器平臺,訪問內(nèi)存的 總線周期也有先后,一個處理器上的交換指令執(zhí)行時另一個處理器的交換指令只能等待總線周期。
當(dāng)線程申請到鎖之后,進(jìn)入到臨界區(qū)訪問臨界資源,這時線程也可能被切走,被切走后會保護(hù)上下文,而鎖數(shù)據(jù)也在上下文中。
所以鎖也被帶走了,所以即便是該線程被掛起了,其他線程也不能申請到鎖,也不能進(jìn)入臨界區(qū)。
必須等待擁有鎖的線程釋放鎖之后才能申請到鎖。
4.自定義封裝一個鎖
#pragma once
#include<iostream>
#include<pthread.h>
//封裝鎖
class _Mutex
{
public:
_Mutex(pthread_mutex_t* mutex):_mutex(mutex)
{}
void lock()
{
pthread_mutex_lock(_mutex);
}
void unlock()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t* _mutex;
};
class lockGuard
{
public:
lockGuard(pthread_mutex_t* mutex):_mutex(mutex)
{
_mutex.lock();
}
~lockGuard()
{
_mutex.unlock();
}
private:
_Mutex _mutex;
};
我們可以使用我們自己封裝的鎖解決搶票問題
二、可重入和線程安全
線程安全: 線程安全指的是在多線程編程中,多個線程對臨界資源進(jìn)行爭搶訪問而不會造成數(shù)據(jù)二義或程序邏輯混亂的情況。常見對全局變量或者靜態(tài)變量進(jìn)行操作,并且沒有鎖保護(hù)的情況下,會出現(xiàn)該問題。
重入: 同一個函數(shù)被不同的執(zhí)行流調(diào)用,當(dāng)前一個流程還沒有執(zhí)行完,就有其他的執(zhí)行流再次進(jìn)入,我們稱之為重入。一個函數(shù)在重入的情況下,運行結(jié)果不會出現(xiàn)任何不同或者任何問題,則該函數(shù)被稱為可重入函數(shù),否則,是不可重入函數(shù)
線程安全的實現(xiàn),通過同步與互斥實現(xiàn)
具體互斥的實現(xiàn)可以通過互斥鎖和信號量實現(xiàn)、而同步可以通過條件變量與信號量實現(xiàn)。
常見的線程不安全的情況:
不保護(hù)共享變量的函數(shù)
函數(shù)狀態(tài)隨著被調(diào)用,狀態(tài)發(fā)生變化的函數(shù)
返回指向靜態(tài)變量指針的函數(shù)
調(diào)用線程不安全函數(shù)的函數(shù)
常見不可重入的情況:
調(diào)用了malloc/free函數(shù),因為malloc函數(shù)是用全局鏈表來管理堆的
調(diào)用了標(biāo)準(zhǔn)I/O庫函數(shù),標(biāo)準(zhǔn)I/O庫的很多實現(xiàn)都以不可重入的方式使用全局?jǐn)?shù)據(jù)結(jié)構(gòu)
可重入函數(shù)體內(nèi)使用了靜態(tài)的數(shù)據(jù)結(jié)構(gòu)
可重入與線程安全聯(lián)系:
函數(shù)是可重入的,那就是線程安全的
函數(shù)是不可重入的,那就不能由多個線程使用,有可能引發(fā)線程安全問題
如果一個函數(shù)中有全局變量,那么這個函數(shù)既不是線程安全也不是可重入的。
可重入與線程安全區(qū)別:
可重入函數(shù)是線程安全函數(shù)的一種
線程安全不一定是可重入的,而可重入函數(shù)則一定是線程安全的。
如果將對臨界資源的訪問加上鎖,則這個函數(shù)是線程安全的,但如果這個重入函數(shù)鎖還未釋放則會產(chǎn)生死鎖
三、死鎖
死鎖概念
死鎖是指在一組進(jìn)程中的各個進(jìn)程均占有不會釋放的資源,但因互相申請被其他進(jìn)程所占用不會釋放的資 源而處于的一種永久等待狀態(tài)。
死鎖四個必要條件
互斥條件:一個資源每次只能被一個執(zhí)行流使用
請求與保持條件:一個執(zhí)行流因請求資源而阻塞時,對已獲得的資源保持不放
不剝奪條件:一個執(zhí)行流已獲得的資源,在末使用完之前,不能強行剝奪
循環(huán)等待條件:若干執(zhí)行流之間形成一種頭尾相接的循環(huán)等待資源的關(guān)系
如何避免死鎖
核心思想:破壞死鎖的4個必要條件中任意一個!
不加鎖
主動釋放鎖
按順序申請鎖
資源一次性分配
破壞死鎖的一個小demo(主動釋放鎖)
#include <iostream> #include<unistd.h> #include <pthread.h> using namespace std; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; void *pthreadRoutinue(void *args) { pthread_mutex_lock(&mutex); //加鎖 cout<<"I get a mutex"<<endl; pthread_mutex_lock(&mutex); //產(chǎn)生死鎖 cout<<"i alive again"<<endl; return nullptr; } int main() { pthread_t pid; pthread_create(&pid, nullptr, pthreadRoutinue, nullptr); sleep(3); cout<<"main thread run"<<endl; pthread_mutex_unlock(&mutex);//主線程區(qū)解鎖 cout<<"main thread unlock"<<endl; sleep(3); return 0; }
四、線程同步
同步:在保證數(shù)據(jù)安全的前提下,讓線程能夠按照某種特定的順序訪問臨界資源,從而有效避免饑餓問 題,叫做同步 。
1.條件變量
概念
與互斥鎖不同,條件變量是用來等待而不是用來上鎖的。條件變量用來自動阻塞一個線程,直到某特殊情況發(fā)生為止。通常條件變量和互斥鎖同時使用。
條件變量使我們可以睡眠等待某種條件出現(xiàn)。條件變量是利用線程間共享的全局變量進(jìn)行同步的一種機制,主要包括兩個動作:一個線程等待"條件變量的條件成立"而掛起;另一個線程使"條件成立"(給出條件成立信號)。
條件變量接口
pthread_cond_init、pthread_cond_destroy:初始化、銷毀條件變量
#include <pthread.h> int pthread_cond_destroy(pthread_cond_t *cond); // pthread_cond_t:條件變量類型,類似pthread_mutex_t int pthread_cond_init(pthread_cond_t *restrict cond,constpthread_condattr_t *restrict attr); // 如果是靜態(tài)或全局的條件變量可使用宏初始化: pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_cond_wait、pthread_cond_signal:等待條件、喚醒線程
#include <pthread.h> // 等待條件滿足 int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex); // 喚醒一個線程,在cond等待隊列里的第一個線程 int pthread_cond_signal(pthread_cond_t *cond); // 一次喚醒所有線程 int pthread_cond_broadcast(pthread_cond_t *cond); ```
demo
#include <iostream> #include<unistd.h> #include <pthread.h> using namespace std; #define num 5 int ticket =1000; pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond=PTHREAD_COND_INITIALIZER; void *active(void *args) { string name=static_cast<const char*>(args); while(true) { pthread_mutex_lock(&mutex); pthread_cond_wait(&cond,&mutex); //調(diào)用該函數(shù),會自己釋放鎖 cout<<name<<" 活動"<<endl; pthread_mutex_unlock(&mutex); } return nullptr; } int main() { pthread_t tids[num]; for(int i=0;i<num;i++) { char * name=new char[64]; snprintf(name,64,"thread-%d",i); //線程創(chuàng) pthread_create(tids+i,nullptr,active,name); } sleep(3); while(true) { cout<<"main thread wakeup thread..."<<endl; //pthread_cond_signal(&cond); //喚醒cond隊列中的一個線程 pthread_cond_broadcast(&cond); //將cond隊列中所以線程喚醒 sleep(1); } for(int i=0;i<num;i++) { pthread_join(tids[i],nullptr); //線程等待 } sleep(3); return 0; }
基于阻塞隊列實現(xiàn)生產(chǎn)者消費者模型
生產(chǎn)者消費者模式就是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。
生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。這個阻塞隊列就是用來給生產(chǎn)者和消費者解耦的。
生產(chǎn)者消費者模型的優(yōu)點:解耦 支持并發(fā) 支持忙閑不均
實則之前所講的進(jìn)程間通信中的管道通信就是一種生產(chǎn)者消費者模型,管道就是讓不同的進(jìn)程能夠看到同一份資源,且管道自帶同步和互斥的機制。進(jìn)程間通信的本質(zhì)其實就是生產(chǎn)者消費者模型。
代碼:
blockQueue.hpp
#pragma once #include <iostream> #include <queue> #include <pthread.h> const int gcap = 5; template <class T> class BlockQueue { public: BlockQueue(const int cap = gcap) : _cap(cap) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_consumerCond, nullptr); pthread_cond_init(&_productorCond, nullptr); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_consumerCond); pthread_cond_destroy(&_productorCond); } bool isFull() { return _cap == _q.size(); } void push(const T &in) { // 生產(chǎn) pthread_mutex_lock(&_mutex); while (isFull()) //細(xì)節(jié)1:使用while ,防止多線程被喚醒生產(chǎn)過多 { // 我們只能在臨界區(qū)內(nèi)部,判斷臨界資源是否就緒 注定了我們在當(dāng)前一定是持有鎖的 pthread_cond_wait(&_productorCond, &_mutex); // 如果隊列為滿,生產(chǎn)者線程休眠 ,此時持有鎖,wait會將鎖unlock // 當(dāng)線程醒來的時候,注定了繼續(xù)從臨界區(qū)內(nèi)部繼續(xù)運行,因為是在臨界區(qū)被切走的 // 注定了當(dāng)線程被喚醒的時候,繼續(xù)在pthread_cond_wait()函數(shù)繼續(xù)向后運行,又要重新申請鎖,申請成功才會徹底返回 } // 沒有滿,讓他繼續(xù)生產(chǎn) _q.push(in); //策略,喚醒消費者線程 pthread_cond_signal(&_consumerCond); pthread_mutex_unlock(&_mutex); } void pop(T *out) { pthread_mutex_lock(&_mutex); while (_q.empty()) //隊列為空 { pthread_cond_wait(&_consumerCond, &_mutex); } *out = _q.front(); _q.pop(); //策略,喚醒生產(chǎn)者 pthread_cond_signal(&_productorCond); pthread_mutex_unlock(&_mutex); } private: std::queue<T> _q; int _cap; pthread_mutex_t _mutex; pthread_cond_t _consumerCond; // 消費者對應(yīng)的條件變量 空 wait pthread_cond_t _productorCond; // 生產(chǎn)者對應(yīng)的條件變量 滿 wait }; ```
task.hpp
#pragma once #include <iostream> #include <string> class Task { public: Task() {} Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0) { } void operator()() { switch (_op) { case '+': _result = _x + _y; break; case '-': _result = _x - _y; break; case '*': _result = _x * _y; break; case '/': if(_y==0) _exitCode=-1; else _result = _x / _y; break; case '%': if(_y==0) _exitCode=-1; else _result = _x % _y; break; default: break; } } std::string formatArg() { return std::to_string(_x)+' '+ _op+ ' '+std::to_string(_y)+" = "; } std::string formatRes() { return std::to_string(_result) + "(" +std::to_string(_exitCode)+")"; } ~Task(){} private: int _x; int _y; char _op; int _result; int _exitCode; }; ```
main.cc
#include "blockQueue.hpp" #include"task.hpp" #include<ctime> #include<unistd.h> void *consumer(void *args) { BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args); while(true) { sleep(1); Task t; //1.將數(shù)據(jù)從blockqueue中獲取 -- 獲取到數(shù)據(jù) bq->pop(&t); t(); //2.結(jié)合某種業(yè)務(wù)邏輯,處理數(shù)據(jù)! std::cout<<"consumer data: "<<t.formatArg()<<t.formatRes()<<std::endl; } } void *productor(void *args) { srand((uint64_t)time(nullptr)^getpid()); BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args); std::string opers="+-*/%"; while(true) { //1.先通過某種渠道獲取數(shù)據(jù) int x=rand()%20+1; int y=rand()%10+1; //2.將數(shù)據(jù)推送到blockqueue -- 完成生產(chǎn)過程 char op=opers[rand()%opers.size()]; Task t(x,y,op); bq->push(t); std::cout<<"productor Task: "<<t.formatArg()<<"?"<<std::endl; } } int main() { //BlockQueue<int> *bq = new BlockQueue<int>(); BlockQueue<Task> *bq = new BlockQueue<Task>(); // 單生產(chǎn),單消費 支持多生產(chǎn),多消費,因為看到同一快資源,使用同一把鎖 pthread_t c, p; pthread_create(&c, nullptr, consumer, bq); pthread_create(&p, nullptr, productor, bq); pthread_join(c, nullptr); pthread_join(p, nullptr); delete bq; return 0; } ```
運行結(jié)果:
2.信號量
概念
信號量本質(zhì)就是一個計數(shù)器,用來描述臨界區(qū)中臨界資源的數(shù)目大小。
臨界資源如果可以被劃分為更小的資源,如果處理得當(dāng),我們也有可能讓多個線程同時訪問臨界資源,從而實現(xiàn)并發(fā)。
但是每個線程想訪問臨界資源,都得先申請信號量資源。
信號量操作接口
申請信號量成功時,臨界資源的數(shù)目會減一;釋放信號量時,臨界資源的數(shù)目會加一。
由于信號量是用來維護(hù)臨界資源的,首先必須得保證自身是安全的,所以常規(guī)的對全局變量的++或–操作肯定是不行的。
P操作(申請信號量)
V操作(釋放信號量)
sem_init、sem_destroy:初始化銷毀信號量(具體用法與mutex和cond十分類似)
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value); // pshared: 默認(rèn)為0, value:信號量的初始值(count) int sem_destroy(sem_t *sem); // sem_t :信號量類型 // Link with -pthread. ```
sem_wait、sem_signal: 申請、釋放信號量
int sem_wait(sem_t *sem); // P操作 int sem_post(sem_t *sem); // V操作 // Link with -pthread. ```
基于環(huán)形隊列的生產(chǎn)者消費者模型
但是現(xiàn)在的環(huán)形隊列的判空判滿不再使用中的兩種方式判斷,因為有了信號量可以判定。
隊列為空的時候,消費者和生產(chǎn)者指向同一個位置。(生產(chǎn)和消費線程不能同時進(jìn)行)(生產(chǎn)者執(zhí)行)
隊列為滿的時候,消費者和生產(chǎn)者也指向同一個位置。(生產(chǎn)和消費線程不能同時進(jìn)行)(消費者執(zhí)行)
當(dāng)隊列不為空不為滿的時候,消費者和生產(chǎn)者不指向同一個位置。(生產(chǎn)和消費線程可以并發(fā)執(zhí)行)
根據(jù)上面三種情況,基于環(huán)形隊列的生產(chǎn)者消費者模型應(yīng)該遵守以下規(guī)則:
生產(chǎn)者不能把消費者套一個圈
消費者不能超過生產(chǎn)者
當(dāng)指向同一個位置的時候,要根據(jù)空、滿狀態(tài),判斷讓誰先執(zhí)行
其他情況,消費者和生產(chǎn)者可以并發(fā)執(zhí)行
實現(xiàn):
ringQueue.hpp
#pragma once #include <iostream> #include <vector> #include <pthread.h> #include <semaphore.h> static const int N = 5; template <class T> class RingQueue { private: void P(sem_t &s) { sem_wait(&s); } void V(sem_t &s) { sem_post(&s); } void Lock(pthread_mutex_t &m) { pthread_mutex_lock(&m); } void Unlock(pthread_mutex_t &m) { pthread_mutex_unlock(&m); } public: RingQueue(int num = N) : _ring(num), _cap(num) { sem_init(&_data_sem, 0, 0); sem_init(&_space_sem, 0, num); _c_step = _p_step = 0; pthread_mutex_init(&_c_mutex, nullptr); pthread_mutex_init(&_p_mutex, nullptr); } // 生產(chǎn) void push(const T &in) { // 1. 可以不用在臨界區(qū)內(nèi)部做判斷,就可以知道臨界資源的使用情況 // 2. 什么時候用鎖,對應(yīng)的臨界資源,是否被整體使用 P(_space_sem); // P() Lock(_p_mutex); _ring[_p_step++] = in; _p_step %= _cap; Unlock(_p_mutex); V(_data_sem); } // 消費 void pop(T *out) { P(_data_sem); Lock(_c_mutex); *out = _ring[_c_step++]; _c_step %= _cap; Unlock(_c_mutex); V(_space_sem); } ~RingQueue() { sem_destroy(&_data_sem); sem_destroy(&_space_sem); pthread_mutex_destroy(&_c_mutex); pthread_mutex_destroy(&_p_mutex); } private: std::vector<T> _ring; int _cap; // 環(huán)形隊列容器大小 sem_t _data_sem; // 只有消費者關(guān)心 sem_t _space_sem; // 只有生產(chǎn)者關(guān)心 int _c_step; // 消費位置 int _p_step; // 生產(chǎn)位置 pthread_mutex_t _c_mutex; pthread_mutex_t _p_mutex; };
task.hpp
#pragma once #include <iostream> #include <string> class Task { public: Task() {} Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0) { } void operator()() { switch (_op) { case '+': _result = _x + _y; break; case '-': _result = _x - _y; break; case '*': _result = _x * _y; break; case '/': if(_y==0) _exitCode=-1; else _result = _x / _y; break; case '%': if(_y==0) _exitCode=-1; else _result = _x % _y; break; default: break; } } std::string formatArg() { return std::to_string(_x)+' '+ _op+ ' '+std::to_string(_y)+" = "; } std::string formatRes() { return std::to_string(_result) + "(" +std::to_string(_exitCode)+")"; } ~Task(){} private: int _x; int _y; char _op; int _result; int _exitCode; }; ```
main.cc
#include "ringQueue.hpp" #include "task.hpp" #include <ctime> #include <pthread.h> #include <memory> #include <sys/types.h> #include <unistd.h> #include <cstring> using namespace std; const char *ops = "+-*/%"; void *consumerRoutine(void *args) { RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args); while (true) { Task t; rq->pop(&t); t(); cout << "consumer done, 處理完成的任務(wù)是: " << t.formatRes() << endl; } } void *productorRoutine(void *args) { RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args); while (true) { // sleep(1); int x = rand() % 100; int y = rand() % 100; char op = ops[(x + y) % strlen(ops)]; Task t(x, y, op); rq->push(t); cout << "productor done, 生產(chǎn)的任務(wù)是: " << t.formatArg() << endl; } } int main() { srand(time(nullptr) ^ getpid()); RingQueue<Task> *rq = new RingQueue<Task>(); // 單生產(chǎn)單消費 // pthread_t c, p; // pthread_create(&c, nullptr, consumerRoutine, rq); // pthread_create(&p, nullptr, productorRoutine, rq); // pthread_join(c, nullptr); // pthread_join(p, nullptr); //多生產(chǎn),多消費 pthread_t c[3], p[2]; for (int i = 0; i < 3; i++) pthread_create(c + i, nullptr, consumerRoutine, rq); for (int i = 0; i < 2; i++) pthread_create(p + i, nullptr, productorRoutine, rq); for (int i = 0; i < 3; i++) pthread_join(c[i], nullptr); for (int i = 0; i < 2; i++) pthread_join(p[i], nullptr); delete rq; return 0; } ```
運行結(jié)果
五、總結(jié)
互斥鎖與信號量的異同
互斥鎖由同一線程加放鎖,信號量可以由不同線程進(jìn)行PV操作。
計數(shù)信號量允許多個線程,且值為剩余可用資源數(shù)量。互斥鎖保證多個線程對一個共享資源的互斥訪問,信號量用于協(xié)調(diào)多個線程對一系列資源的訪問條。
條件變量與信號量的異同
使用條件變量可以一次喚醒所有等待者,而這個信號量沒有的功能。
信號量是有一個值,而條件變量是沒有的。從實現(xiàn)上來說一個信號量可以是用mutex + count + cond實現(xiàn)的。因為信號量有一個狀態(tài),可以精準(zhǔn)的同步,信號量可以解決條件變量中存在的喚醒丟失問題。
條件變量一般需要配合互斥鎖使用,而信號量可根據(jù)情況而定。文章來源:http://www.zghlxwxcb.cn/news/detail-713807.html
有了互斥鎖和條件變量還提供信號量的原因是:盡管信號量的意圖在于進(jìn)程間同步,互斥鎖和條件變量的意圖在于線程間同步,但是信號量也可用于線程間,互斥鎖和條件變量也可用于進(jìn)程間。信號量最有用的場景是用以指明可用資源的數(shù)量。文章來源地址http://www.zghlxwxcb.cn/news/detail-713807.html
到了這里,關(guān)于【Linux】線程同步和互斥的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!