進程/線程間的互斥相關(guān)背景概念
?? 臨界資源:多線程指行流共享的資源叫做臨界資源。
?? 臨界區(qū):每個線程內(nèi)部訪問臨界資源的代碼片段叫做臨界區(qū)。
?? 互斥:任何時刻,互斥保證只有一個指行流進入臨界區(qū),訪問臨界資源,通常是對臨界區(qū)起保護作用。
?? 原子性:不被任何調(diào)度所打斷的操作,該類操作只有兩態(tài):完成,未完成。
臨界資源與臨界區(qū)
我們在實現(xiàn)多進程間通信的時候,首先面臨的問題就是要讓多個進程看到同一份資源,因為進程具有獨立性,擁有自己的地址空間。所以,要讓多進程實現(xiàn)通信,就要在內(nèi)核中開辟特定的資源,使其對多個進程都是可見的,也就是說是被多個進程所共享的,通常實現(xiàn)進程間通信的方式:管道,消息隊列,共享內(nèi)存。這些被多進程所共享的資源就叫做臨界資源,同時訪問這些臨界資源的代碼片段就叫做臨界區(qū)。
而對于多線程來說,其優(yōu)勢就是通信十分便利,因為多個執(zhí)行流共享同一個地址空間,所以具有全局性質(zhì)的數(shù)據(jù)都可以被多線程所訪問。
#include <iostream>
#include <pthread.h>
#include <unistd.h>
int g_val = 0;
void *thread_run(void *args)
{
while (true)
{
std::cout << pthread_self() << " g_val : " << g_val++ << std::endl;
sleep(1);
}
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, thread_run, nullptr);
while (true)
{
std::cout << "main thread - g_val : " << g_val << std::endl;
sleep(1);
}
pthread_join(tid, nullptr);
return 0;
}
代碼中g(shù)_val這個全局變量就是被多線程所共享的(新線程中對g_val++,對應(yīng)主線程中打印出g_val的值也會發(fā)生改變),這種被多線程所共享的資源就叫做臨界資源,而在主線程中打印語句,新線程中打印語句和對g_val++語句,這種訪問臨界資源的代碼片段就叫做臨界區(qū)。
互斥與原子性
當多執(zhí)行流訪問臨界資源時,如果沒有對臨界資源加以保護,那么就可能出現(xiàn)數(shù)據(jù)不一致的情況。例如下面的多線程搶票邏輯:
int tickets = 10000;
void *thread_run(void *args)
{
while (true)
{
if (tickets > 0)
{
usleep(100); // 模擬搶票的時間
std::cout << pthread_self() << " got a ticket :" << tickets-- << std::endl;
usleep(100); // 模擬搶票后的處理動作
}
else
{
break;
}
}
}
const int NUM = 5;
int main()
{
pthread_t tids[NUM];
for (int i = 0; i < NUM; i++)
{
pthread_create(tids + i, nullptr, thread_run, nullptr);
}
for (int i = 0; i < NUM; i++)
{
pthread_join(tids[i], nullptr);
}
return 0;
}
可以發(fā)現(xiàn)票居然被搶到了負數(shù),相當于多出了幾張票,這與理論上票的數(shù)量是不一致的,那么造成這種情況的原因是什么呢?原因:
1,if (tickets > 0) 語句處發(fā)生問題:假設(shè)當票數(shù)只剩一張的時候,某個進程通過了if條件判斷,但是在它搶到票之前因為時間片到了被掛起了,所以此時就會有別的線程也會通過了if條件判斷,這時就會出現(xiàn)只剩最后一張票了,但是有多個線程通過了if判斷,當它們在搶取票的時候,就會出現(xiàn)票數(shù)成負的情況。
2,由于tickets–,這條語句不是原子性的,因為一條–語句會被匯編成多條指令,那么就表示它可能被調(diào)度所打斷,圖示:
對一個數(shù)據(jù)–操作,主要由三步組成:
(1)先將數(shù)據(jù)從內(nèi)存加載到某個寄存器中
(2)對寄存器的數(shù)據(jù)-1
(3)將寄存器的數(shù)據(jù)寫回內(nèi)存。
此條語句不是原子的,可能被調(diào)度所打斷,可能會出現(xiàn)某個線程運行到了第2步的時候,還沒來得及將寄存器中的值寫回內(nèi)存中,就被切換了,此時第二個線程從內(nèi)存中讀取到的tickets的數(shù)據(jù)就是錯誤的。
舉一種極端情況,假設(shè)票總數(shù)為100,第一個線程將票數(shù)減到了99的時候,還沒來得及將寄存器的值寫回內(nèi)存,就被切換了,此時第二個線程被調(diào)度,它一下子搶到了90張票,將tickets減到了10。當?shù)谝粋€線程被再次切換回來的時候,會從它保存的上下文繼續(xù)運行,那么就將99寫回內(nèi)存,這就會造成數(shù)據(jù)的不一致。
所以,為了防止以上出現(xiàn),就要保證執(zhí)行臨界區(qū)的代碼得是互斥的,也就是任何止時刻只有一個線程能夠訪問臨界區(qū),對臨界資源修改。
互斥量
??要解決上述的問題,需要做到三點:
1,代碼必須要有互斥行為:當代碼進入臨界區(qū)執(zhí)行時,不允許其他線程進入該臨界區(qū)。
2,如果多個線程同時要執(zhí)行臨界區(qū)的代碼,并且臨界區(qū)內(nèi)沒有線程在執(zhí)行,那么只能允許一個線程進入臨界區(qū)。
3,如果線程不在臨界區(qū)中執(zhí)行,那么該線程不能阻止其他線程進入臨界區(qū)。
??做到以上三點,那么就需要使用互斥量,本質(zhì)就是一把鎖。在訪問臨界區(qū)之前要先加鎖,訪問完臨界區(qū)后要解鎖,從而保證多線程訪問臨界區(qū)的代碼時是互斥的。
??也就是說對于非臨界區(qū)的代碼允許多線程并發(fā)運行,但是臨界區(qū)的代碼是串行執(zhí)行的。
互斥量接口
互斥量的初始化
??1,靜態(tài)分配
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
??2,動態(tài)分配
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);
//參數(shù)1:鎖的地址
//參數(shù)2:鎖的屬性:通常置為nullptr即可。
互斥量的銷毀
int pthread_mutex_destroy(pthread_mutex_t *mutex);
注意:
1,靜態(tài)分配的互斥量是不需要被銷毀的。
2,不要銷毀一個正在加鎖的互斥量。
3,已經(jīng)銷毀的互斥量,確保后面不會有線程再嘗試加鎖。
加鎖和解鎖
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
??在調(diào)用pthread_mutex_lock的時候有以下情況:
1,互斥量沒有被其他線程占用,那么該函數(shù)就會鎖定互斥量,返回成功。
2,互斥量已經(jīng)被其他線程占用,或者同時多個線程競爭這個互斥量,但是沒有競爭到互斥量,那么執(zhí)行此函數(shù)的線程就會陷入阻塞狀態(tài),當互斥量被解鎖時就會被喚醒。
改善搶票系統(tǒng)
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *thread_run(void *args)
{
while (true)
{
// 進入臨界區(qū)前要先加鎖
pthread_mutex_lock(&mutex);
if (tickets > 0)
{
usleep(100); // 模擬搶票的時間
std::cout << pthread_self() << " got a ticket :" << tickets-- << std::endl;
pthread_mutex_unlock(&mutex);
usleep(100); // 模擬搶票后的處理動作
}
else // 出臨界區(qū)時要解鎖
{
pthread_mutex_unlock(&mutex);
break;
}
}
}
互斥量原理
??為了實現(xiàn)互斥操作,大多數(shù)的體系結(jié)構(gòu)提供了swap或exchange指令,該指令的作用是把寄存器和內(nèi)存單元的數(shù)據(jù)做交換,由于只有一條指令,所以保證了原子性,即使是多處理器平臺,訪問內(nèi)存的總線周期一定有先后,一個處理器上的交換指令在執(zhí)行時另一個處理器的交換指令只能等待總線周期。
??lock與unlock的偽代碼:
//mutex的起始值為1
lock:
mov $0 , %al
xchgb %al , mutex
if(al寄存器的內(nèi)容 > 0){
return 0;//表示加鎖成功
} else {
掛起等待;
}
goto lock; //再次被喚醒的時候再去競爭鎖
unlock:
mov $1 , mutex
喚醒等待mutex的線程
return 0;
??線程各自擁有一組寄存器,來保存自己的上下文,也就是說寄存器中的內(nèi)容是屬于線程的,當線程被切換出去的時候,寄存器中的內(nèi)容會保存到該線程的上下文中。只要當某個線程運行時al寄存器中的值和mutex交換后為1,那么證明此線程搶到了鎖,即使此線程此刻就被切換出去,其他線程也是搶不到鎖的(此時mutex的值就為0),因為當此線程切換出去的時候al寄存器的內(nèi)容被保存到線程的上下文中,當線程再次被調(diào)度的時候,它會恢復(fù)其上下文,此時al寄存器的值就會被重新置1,在向下執(zhí)行if判斷的時候,表示加鎖成功。
??解鎖的操作就是給mutex置為1,一條mov指令本身就是原子的。解鎖只是給mutex置為1,那么意味著不僅可以加鎖的線程給自己解鎖,還可以其他線程對其解鎖。
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *thread_run(void *args)
{
pthread_mutex_lock(&mutex);
std::cout << "i got a mutex" << std::endl;
pthread_mutex_lock(&mutex);//由于自己第一次申請的鎖沒有釋放,當?shù)诙紊暾堟i的時候就會被阻塞
std::cout << "i got a mutex again" << std::endl;
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, thread_run, nullptr);
sleep(3);
pthread_mutex_unlock(&mutex);//主線程進行解鎖,那么新線程再次申請鎖的時候就能成功了。
std::cout << "main thread unlock the mutex" << std::endl;
pthread_join(tid, nullptr);
return 0;
}
可重入與線程安全
重入和線程安全的概念
??線程安全:多個線程并發(fā)訪問同一段代碼的時候,不會出現(xiàn)不同的結(jié)果。常見對全局變量,靜態(tài)變量進行操作,在不加鎖保護的情況下,會出現(xiàn)該問題。
?? 重入:同一個函數(shù)被不同的執(zhí)行流調(diào)用,當前一個執(zhí)行流沒有執(zhí)行完,就會有其他的執(zhí)行流再次進入,我們稱之為重入。一個函數(shù)在重入的情況下,運行的結(jié)果不會出現(xiàn)任何問題,則該函數(shù)被稱為可重入函數(shù),否則,就是不可重入函數(shù)。
常見線程不安全情況
??不保護共享變量的函數(shù)
?? 函數(shù)狀態(tài)隨著被調(diào)用而發(fā)生變化(例如,某個函數(shù)內(nèi)部存在一個靜態(tài)變量做計數(shù)器,每次調(diào)用就把計數(shù)器+1)
?? 返回指向靜態(tài)變量指針的函數(shù)
?? 調(diào)用線程不安全函數(shù)的函數(shù)
常見線程安全的情況
?? 每個線程對全局變量或靜態(tài)變量只有讀權(quán)限,而沒有寫入權(quán)限,一般來說這些線程時安全的。
?? 類或者接口對線程來說都是原子操作(例如:加鎖,解鎖的接口)
?? 多個線程之間的切換不會造成接口的執(zhí)行結(jié)果有二義性。
常見不可重入情況
?? 調(diào)用了malloc/free函數(shù),因為malloc函數(shù)是用全局鏈表來管理堆的。
?? 調(diào)用了標準I/O庫函數(shù),標準I/O庫的很多實現(xiàn)都以不可重入的方式使用全局數(shù)據(jù)結(jié)構(gòu)
??可重入函數(shù)體內(nèi)使用了靜態(tài)的數(shù)據(jù)結(jié)構(gòu)
常見可重入情況
??不適用全局變量或者靜態(tài)變量
?? 不適用malloc/new開辟出的空間
?? 不調(diào)用不可重入函數(shù)
?? 不反回靜態(tài)或者全局數(shù)據(jù),所有數(shù)據(jù)都由函數(shù)調(diào)用者來提供
?? 使用本地數(shù)據(jù),或者通過制作全局數(shù)據(jù)的本地拷貝來保護全局數(shù)據(jù)
可重入與線程安全的關(guān)系
??函數(shù)是可重入的那么線程在調(diào)用此函數(shù)期間是線程安全的
?? 函數(shù)是不可重入的,那么就不能被多個線程調(diào)用,有可能引發(fā)線程安全問題
?? 如果一個函數(shù)中有全局變量,那么這個函數(shù)既不是線程安全也不是可重入的
可重入與線程安全的區(qū)別
?? 可重入函數(shù)是線程安全函數(shù)的一種
?? 可重入和不可重入是函數(shù)的一種屬性沒有好壞之分,而線程安全與線程不安全是由好壞之分的。
?? 線程安全不一定是調(diào)用了可重入函數(shù)(例如多個線程分別執(zhí)行不同的函數(shù)),而調(diào)用可重入函數(shù)的線程,在調(diào)用可重入函數(shù)的期間一定是線程安全的
??如果對臨界資源加鎖保護,那么這個函數(shù)是線程安全的函數(shù),也是可重入的,但是如果這個重入函數(shù)沒有對鎖進行釋放,那么就會產(chǎn)生死鎖,就是不可重入的。
死鎖
死鎖概念
??死鎖是指一組進程中的各個進程均占有不會釋放的資源,但因互相申請被其他進程所占用的不會釋放的資源而處于一種永久的等待狀態(tài)。
死鎖的四個必要條件
1,互斥:一個資源每次只能被一個指行流使用
2,請求與保持:一個執(zhí)行流因請求資源而阻塞時,對已經(jīng)獲得的資源保持不妨
3,不剝奪:一個執(zhí)行流已獲得的資源,在未使用之前,不能強行剝奪
4,循環(huán)等待:若干執(zhí)行流之間形成一種頭尾相接的循環(huán)等待資源的關(guān)系
如何避免死鎖
??破壞形成死鎖四個必要條件之一即可。
1,不加鎖
2,主動釋放鎖(申請鎖時采用try lock,當多次不成功時,先不再申請并且將自己占有的資源釋放)
3,按順序加鎖
4,控制線程統(tǒng)一釋放鎖
線程的同步
條件變量
??當一個線程互斥的訪問一個臨界資源時,他可能發(fā)現(xiàn)在其他線程改變狀態(tài)之前,它什么也做不了。
??例如,將之前搶票的邏輯改一下,當線程發(fā)現(xiàn)tickets <= 0的時候,不break而是選擇繼續(xù)搶票,因為存在某個線程退票的情況,但是在沒有其他線程退票之前,這個線程就會反復(fù)的申請鎖-釋放鎖但申請鎖之后什么都做不了,這就會導(dǎo)致其他線程的饑餓狀態(tài),并且反復(fù)的申請鎖釋放鎖搶到鎖后什么都不做是一種資源的浪費,這種情況就需要用到條件變量-當if判斷tickets <= 0的時候,不選擇繼續(xù)去搶票而是進入到條件變量中等待,當臨界資源滿足條件時再通知其繼續(xù)做相應(yīng)得搶票操作。
void *thread_run(void *args)
{
while (true)
{
// 進入臨界區(qū)前要先加鎖
pthread_mutex_lock(&mutex);
std::cout << pthread_self() << " got the mutex" << std::endl;
if (tickets > 0)
{
usleep(100); // 模擬搶票的時間
std::cout << pthread_self() << " got a ticket :" << tickets-- << std::endl;
pthread_mutex_unlock(&mutex);
usleep(100); // 模擬搶票后的處理動作
}
else
{
pthread_mutex_unlock(&mutex);
}
}
}
可以看到,當票被搶光后由于沒有其他線程退票,導(dǎo)致線程id后四位為1920的線程頻繁申請和釋放鎖,但是它在申請到鎖后并沒有做任何事情。
線程同步
??在保證數(shù)據(jù)安全的情況下,讓線程按照某種特定的順序訪問臨界資源,從而避免饑餓問題,叫做同步。
條件變量操作
- 初始化
//動態(tài)分配
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrictattr);
//第一個參數(shù)為條件變量的地址
//第二個參數(shù)為條件變量的屬性,通常置為nulltr
//靜態(tài)分配
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
- 銷毀
//如果動態(tài)分配就需要調(diào)用destroy銷毀
int pthread_cond_destroy(pthread_cond_t *cond)
- 使線程在條件變量中等待條件滿足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
//第一個參數(shù)表示在哪個條件變量中等待
//第二個參數(shù)為互斥鎖
- 喚醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
//喚醒一個
int pthread_cond_signal(pthread_cond_t *cond);
//全部喚醒
??簡單案例,讓線程按一定順序運行
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void *thread_run(void *args)
{
while (true)
{
pthread_mutex_lock(&mutex);
// 保護的臨界資源...
pthread_cond_wait(&cond, &mutex);
std::cout << pthread_self() << " 活動" << std::endl;
pthread_mutex_unlock(&mutex);
}
}
int main()
{
pthread_t tids[3];
for (int i = 0; i < 3; i++)
{
pthread_create(tids + i, nullptr, thread_run, nullptr);
}
sleep(1);
while (true)
{
pthread_cond_signal(&cond);
sleep(1);
}
for (int i = 0; i < 3; i++)
{
pthread_join(tids[i], nullptr);
}
return 0;
}
??條件變量的原理:實際上條件變量內(nèi)部維護一個隊列結(jié)構(gòu),當一個線程被wait的時候就會插入到隊列的尾部等待,當喚醒等待時就是將隊列頭部的線程喚醒(指的是signal喚醒一個的時候)。
為什么pthread_cond_wait需要互斥量?
??使用條件變量是實現(xiàn)線程同步的一種手段,當臨界資源不滿足要求的時候就將此線程wait,放到條件變量中等待,等到臨界資源滿足要求的時候再將線程喚醒,所以使用條件變量一定是多線程對臨界資源并發(fā)訪問的情況,所以一定需要互斥量來對臨界資源做保護,對臨界資源做法判斷是否滿足要求一定是在加鎖之后,在進入條件變量等待之前。所以,線程在進入條件變量中等待之前一定是持有鎖的,但是這個線程進入條件變量中等待時,其他線程是可以工作的(也就是說可以去競爭鎖的),所以在進入條件變量中等待的時候一定要將鎖釋放,某則就會造成死鎖問題。
??并且當線程被喚醒,出條件變量之前一定要重新申請鎖,因為線程被喚醒之后,會恢復(fù)其上下文繼續(xù)運行,也就是說會繼續(xù)在臨界區(qū)內(nèi)運行,為了保證臨界資源的安全,在線程進入到臨界區(qū)內(nèi)的時候必須是持有鎖的,所以線程出條件變量之前是要重新申請鎖的。
條件變量錯誤設(shè)計
pthread_mutex_lock(&mutex);
while (condition_is_false) {
pthread_mutex_unlock(&mutex);
//解鎖之后,等待之前,條件可能已經(jīng)滿足,信號已經(jīng)發(fā)出,但是該信號可能被錯過
pthread_cond_wait(&cond);
pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
??由于解鎖和等待不是原子操作。當某個線程判斷條件不滿足時,首先要釋放鎖,在其進入條件變量中等待之前,可能有其他線程已經(jīng)申請到了鎖,并且給其發(fā)送了喚醒的信號,那么此線程就會錯過此信號,可能導(dǎo)致線程永遠阻塞在這個條件變量中。所以解鎖和等待必須是原子操作。
基于阻塞隊列的生產(chǎn)消費模型
??生產(chǎn)消費模型是多線程同步與互斥的典型例子,生產(chǎn)線程與消費線程通過一個緩沖區(qū)(交易場所)來進行數(shù)據(jù)的交互,并且生產(chǎn)消費模型是一個高效的,忙閑不均的,生產(chǎn)者與消費者解耦的模型。
??基于阻塞隊列的生產(chǎn)消費模型指的是連接生產(chǎn)線程和消費線程的緩沖區(qū)是一個定長的隊列,那么這個阻塞隊列是被所有線程共享的,就是一份臨界資源,那么就要用互斥量對齊保護,保證在任意時刻只有一個線程能夠進入臨界區(qū),對臨界資源修改,所以阻塞隊列要配備一把互斥鎖。
??生產(chǎn)消費模型的特點:
1,三種關(guān)系:
(1)生產(chǎn)者與生產(chǎn)者之間是互斥關(guān)系。
(2)消費者與消費者之間是互斥互斥關(guān)系。
(3)生產(chǎn)者與消費者之間是同步與互斥的關(guān)系。
生產(chǎn)者與消費者的同步關(guān)系體現(xiàn)在:
當阻塞隊列為滿的時候,生產(chǎn)者線程就不能再生產(chǎn)了,就要進入到條件變量中等待。
當阻塞隊列為空的時候,消費者線程就不能再消費了,就要進入到條件變量中等待。
并且,消費者線程清楚此時緩沖區(qū)是否未滿,所以消費線程可以根據(jù)一定的策略來喚醒生產(chǎn)線程。
同理,生產(chǎn)線程清楚此時緩沖區(qū)是否為空,所以生產(chǎn)線程可以根據(jù)一定的策略來喚醒等待中的消費線程。
所以,生產(chǎn)線程與消費線程是按照一定的順序被調(diào)度運行的,避免了饑餓問題,所以說生產(chǎn)者與消費者之間存在同步的關(guān)系。
生產(chǎn)者與消費者的互斥關(guān)系體現(xiàn)在:
由于阻塞隊列是一份臨界資源,所以在任一時刻,只能有一個線程對臨界資源作修改,所以生產(chǎn)者與消費者之間存在互斥關(guān)系。
2,兩種角色:生產(chǎn)者與消費者
3,一個場所:指的是一個緩沖區(qū)
??所以,這個模型中不僅要存在一把互斥鎖,還要有屬于生產(chǎn)者線程的條件變量,屬于消費者的條件變量。
??生產(chǎn)消費模型的高效體現(xiàn)在哪里?忙閑不均又是如何體現(xiàn)的?
緩沖區(qū)作為臨界資源,對于生產(chǎn)線程和消費線程來說都是串行執(zhí)行的,所以這里是體現(xiàn)不出高效性的。
高效性是體現(xiàn)在:當消費者在對數(shù)據(jù)做處理的時候,生產(chǎn)者此時可以生產(chǎn)數(shù)據(jù),也可以將數(shù)據(jù)輸送到緩沖區(qū)中。
同理,當生產(chǎn)者在生產(chǎn)數(shù)據(jù)的時候,消費者可以從緩沖區(qū)中取出數(shù)據(jù),或者對數(shù)據(jù)做處理。
它們的忙閑不均也是體現(xiàn)在,如果緩沖區(qū)已滿時,只有消費線程在工作,同理緩沖區(qū)為空的時候,只有生產(chǎn)者在工作。
并且,這個模型也做到了將生產(chǎn)線程與消費線程解耦。
代碼實現(xiàn)
LockGuard.hpp
#pragma once
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *p) : _pmutex(p) {}
void lock() { pthread_mutex_lock(_pmutex); }
void unlock() { pthread_mutex_unlock(_pmutex); }
~Mutex() {}
private:
pthread_mutex_t *_pmutex;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *p) : _mutex(p) { _mutex.lock(); }
~LockGuard() { _mutex.unlock(); }
private:
Mutex _mutex;
};
??這是對互斥鎖的簡單封裝,創(chuàng)建的LockGuard對象在構(gòu)造的時候就會加鎖,出作用域析構(gòu)的時候就會自動解鎖,不用再去手動的解鎖了。
BlockQueue.hpp
#pragma once
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "LockGuard.hpp"
template <typename T>
class BlockQueue
{
public:
BlockQueue(int capcity = 1) : _capcity(capcity)
{
// 對鎖和條件變量初始化
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond_consumer, nullptr);
pthread_cond_init(&_cond_productor, nullptr);
}
~BlockQueue()
{
// 對鎖和條件變量銷毀
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond_consumer);
pthread_cond_destroy(&_cond_productor);
}
bool empty() const { return _q.empty(); }
bool full() const { return _capcity == _q.size(); }
void push(const T &t)
{
{
LockGuard guard(&_mutex);
while (full() == true)
{
pthread_cond_wait(&_cond_productor, &_mutex);
}
_q.push(t);
// 生產(chǎn)者生產(chǎn)完就可以喚醒消費者了
pthread_cond_signal(&_cond_consumer);
}
}
void pop(T &got)
{
{
LockGuard guard(&_mutex);
while (empty() == true)
{
pthread_cond_wait(&_cond_consumer, &_mutex);
}
got = _q.front();
_q.pop();
// 消費者消費完可以通知生產(chǎn)者進行生產(chǎn)
pthread_cond_signal(&_cond_productor);
}
}
private:
int _capcity;
std::queue<T> _q;
pthread_mutex_t _mutex;
pthread_cond_t _cond_productor;
pthread_cond_t _cond_consumer;
};
??截圖中為什么使用while判斷,而不是if判斷的原因
因為,如果消費者取出數(shù)據(jù)后,隊列中有一個空余的位置,但是生產(chǎn)者的條件變量中阻塞了多個生產(chǎn)者,而此時消費者是通過broadcast來喚醒生產(chǎn)的的話,那就會面臨多個生產(chǎn)者而只有一個空位可供生產(chǎn)的問題。
同理,消費者那里也是用while判斷。
mythread.cc文章來源:http://www.zghlxwxcb.cn/news/detail-492723.html
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstdlib>
#include "BlockQueue.hpp"
#include "LockGuard.hpp"
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *thread_productor(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
int x = rand() % 100 + 1;
bq->push(x);
{
LockGuard guard(&mutex);
std::cout << pthread_self() << " push a data : " << x << std::endl;
}
}
return nullptr;
}
void *thread_consumer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
sleep(1);
int x;
bq->pop(x);
{
LockGuard guard(&mutex);
std::cout << pthread_self() << " pop a data : " << x << std::endl;
}
}
return nullptr;
}
int main()
{
srand((size_t)time(0));
pthread_t P[3];
pthread_t C[3];
BlockQueue<int> *bq = new BlockQueue<int>(3);
pthread_create(P, nullptr, thread_productor, bq);
pthread_create(P + 1, nullptr, thread_productor, bq);
pthread_create(P + 2, nullptr, thread_productor, bq);
pthread_create(C, nullptr, thread_consumer, bq);
pthread_create(C + 1, nullptr, thread_consumer, bq);
pthread_create(C + 2, nullptr, thread_consumer, bq);
for (int i = 0; i < 3; i++)
{
pthread_join(P[i], nullptr);
pthread_join(C[i], nullptr);
}
return 0;
}
文章來源地址http://www.zghlxwxcb.cn/news/detail-492723.html
到了這里,關(guān)于Linux-線程的同步與互斥的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!