AQS抽象同步器核心原理
在爭用激烈的場景下使用基于CAS自旋實現(xiàn)的輕量級鎖有兩個大的問題:
- CAS惡性空自旋會浪費大量的CPU資源。
- 在SMP架構(gòu)的CPU上會導(dǎo)致“總線風(fēng)暴”。
解決CAS惡性空自旋的有效方式之一是以空間換時間,較為常見的方案有兩種:分散操作熱點、使用隊列削峰。JUC并發(fā)包使用的是隊列削峰的方案解決CAS的性能問題,并提供了一個基于雙向隊列的削峰基類——抽象基礎(chǔ)類AbstractQueuedSynchronizer(抽象同步器類,簡稱為AQS)。
鎖與隊列的關(guān)系
無論是單體服務(wù)應(yīng)用內(nèi)部的鎖,還是分布式環(huán)境下多體服務(wù)應(yīng)用所使用的分布式鎖,為了減少由于無效爭奪導(dǎo)致的資源浪費和性能惡化,一般都基于隊列進行排隊與削峰。
- CLH 鎖的內(nèi)部隊列
CLH自旋鎖使用的CLH是一個單向隊列,也是一個FIFO隊列。在獨占鎖中,競爭資源在一個時間點只能被一個線程鎖訪問;隊列的
隊首節(jié)點(隊列的頭部)表示占有鎖的節(jié)點,新加入的搶鎖線程則需要等待,會插入到隊列的尾部。CLH鎖的內(nèi)部結(jié)構(gòu)如圖所示:
-
分布式鎖的內(nèi)部隊列
在分布式鎖的實現(xiàn)中,比較常見的也是基于隊列的方式進行不同節(jié)點中“等鎖線程”的統(tǒng)一調(diào)度和管理。以基于ZooKeeper的分布式鎖為例,其等待隊列的結(jié)構(gòu)大致如圖所示: -
AQS 的內(nèi)部隊列
AQS是JUC提供的一個用于構(gòu)建鎖和同步容器的基礎(chǔ)類。JUC包內(nèi)的許多類都是基于AQS構(gòu)建,例如ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、FutureTask等。AQS解決了在實現(xiàn)同步容器時設(shè)計的大量細(xì)節(jié)問題,主要原理和CLH隊列差不多。AQS隊列內(nèi)部維護的是一個FIFO的雙向鏈表,這種結(jié)構(gòu)的特點是每個數(shù)據(jù)結(jié)構(gòu)都有兩個指針,分別指向直接的前驅(qū)節(jié)點和直接的后驅(qū)節(jié)點。所以雙向鏈表可以從任意一個節(jié)點開始很方便地訪問前驅(qū)節(jié)點和后驅(qū)節(jié)點。每個節(jié)點其實是由線程封裝的,當(dāng)線程爭搶鎖失敗后會封裝成Node加入到AQS隊列中去;當(dāng)獲取鎖的線程釋放鎖以后,會從隊列中喚醒一個阻塞的節(jié)點(線程)。AQS的內(nèi)部結(jié)構(gòu)如圖所示:
AQS 的核心成員
AQS出于“分離變與不變”的原則,基于模板模式實現(xiàn)。AQS為鎖獲取、鎖釋放的排隊和出隊過程提供了一系列的模板方法。由于JUC的顯式鎖種類豐富,因此AQS將不同鎖的具體操作抽取為鉤子方法,供各種鎖的子類(或者其內(nèi)部類)去實現(xiàn)。
狀態(tài)標(biāo)志位
AQS中維持了一個單一的volatile修飾的狀態(tài)信息state,AQS使用int類型的state標(biāo)示鎖的狀態(tài),可以理解為鎖的同步狀態(tài)。
//同步狀態(tài),使用volatile保證線程可見
private volatile int state;
state因為使用volatile保證了操作的可見性,所以任何線程通過getState()獲得狀態(tài)都是可以得到最新值。AQS提供了getState()、setState()來獲取和設(shè)置同步狀態(tài)。由于setState()無法保證原子性,因此AQS給我們提供了compareAndSetState()方法利用底層
UnSafe 的 CAS 機 制 來 實 現(xiàn) 原 子 性 。 compareAndSetState() 方 法 實 際 上 調(diào) 用 的 是 unsafe 成 員 的compareAndSwapInt()方法。以ReentrantLock為例,state初始化為0,表示未鎖定狀態(tài)。A線程執(zhí)行該鎖的lock()操作時,會調(diào)用tryAcquire()獨占該鎖并將state加1。此后,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其他線程才有機會獲取該鎖。當(dāng)然,釋放鎖之前,A線程自己是可以重復(fù)獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態(tài)。
AbstractQueuedSynchronizer繼承了AbstractOwnableSynchronizer,這個基類只有一個變量叫exclusiveOwnerThread,表示當(dāng)前占用該鎖的線程,并且提供了相應(yīng)的get()和set()方法。
隊列節(jié)點類
AQS是一個虛擬隊列,不存在隊列實例,僅存在節(jié)點之間的前后關(guān)系。節(jié)點類型通過內(nèi)部類Node定義,其核心的成員如下:
static final class Node {
/**節(jié)點等待狀態(tài)值1:取消狀態(tài)*/
static final int CANCELLED =1;
/**節(jié)點等待狀態(tài)值-1:標(biāo)識后繼線程處于等待狀態(tài)*/
static final int SIGNAL = -1;
/**節(jié)點等待狀態(tài)值-2:標(biāo)識當(dāng)前線程正在進行條件等待*/
static final int CONDITION = -2;
/**節(jié)點等待狀態(tài)值-3:標(biāo)識下一次共享鎖的acquireShared操作需要無條件傳播*/
static final int PROPAGATE = -3;
//節(jié)點狀態(tài):值為SIGNAL、CANCELLED、CONDITION、PROPAGATE、0
//普通的同步節(jié)點的初始值為0,條件等待節(jié)點的初始值為CONDITION(-2)
volatile int waitStatus;
//節(jié)點所對應(yīng)的線程,為搶鎖線程或者條件等待線程
volatile Thread thread;
//前驅(qū)節(jié)點,當(dāng)前節(jié)點會在前驅(qū)節(jié)點上自旋,循環(huán)檢查前驅(qū)節(jié)點的waitStatus狀態(tài)
volatile Node prev;
//后驅(qū)節(jié)點
volatile Node next;
//如果當(dāng)前Node不是普通節(jié)點而是條件等待節(jié)點,則節(jié)點處于某個條件的等待隊列上
//此屬性指向下一個條件等待節(jié)點,即其條件隊列上的后驅(qū)節(jié)點
Node nextWaiter;
...
}
waitStatus 屬性
每個節(jié)點與等待線程關(guān)聯(lián),每個節(jié)點維護一個狀態(tài)waitStatus,waitStatus的各種值以常量的形式進行定義。waitStatus的各常量值具體如下:
-
static final int CANCELLED = 1
waitStatus值為1時表示該線程節(jié)點已釋放(超時、中斷),已取消的節(jié)點不會再阻塞。表示線程因為中斷或者等待超時,需要從等待隊列中取消等待。由于該節(jié)點線程等待超時或者被中斷,需要從同步隊列中取消等待,因此該線程被置1。節(jié)點進入了取消狀態(tài),該類型節(jié)點不會參與競爭,且會一直保持取消狀態(tài)。 -
static final int SIGNAL = ?1
waitStatus為SIGNAL(?1)時表示其后驅(qū)節(jié)點處于等待狀態(tài),當(dāng)前節(jié)點對應(yīng)的線程如果釋放了同步狀態(tài)或者被取消,將會通知后驅(qū)節(jié)點,使后驅(qū)節(jié)點的線程得以運行。 -
static final int CONDITION =?2
waitStatus為?2時,表示該線程在條件隊列中阻塞(Condition有使用),表示節(jié)點在等待隊列中(這里指的是等待在某個鎖的CONDITION上),當(dāng)持有鎖的線程調(diào)用了CONDITION的signal()方法之后,節(jié)點會從該CONDITION的等待隊列轉(zhuǎn)移到該鎖的同步隊列上去競爭鎖(注意:這里的同步隊列就是我們說的AQS維護的FIFO隊列,等待隊列則是每個CONDITION關(guān)聯(lián)的隊列)。
節(jié)點處于等待隊列中,節(jié)點線程等待在CONDITION上,當(dāng)其他線程對CONDITION調(diào)用了signal()方法后,該節(jié)點從等待隊列中轉(zhuǎn)移到同步隊列中,加入到對同步狀態(tài)的獲取中。 -
static final int PROPAGATE = ?3
waitStatus為?3時,表示下一個線程獲取共享鎖后,自己的共享狀態(tài)會被無條件地傳播下去,因為共享鎖可能出現(xiàn)同時有N個鎖可以用,這時直接讓后面的N個節(jié)點都來工作。這種狀態(tài)在CountDownLatch中使用到了。
為什么當(dāng)一個節(jié)點的線程獲取共享鎖后,要喚醒后繼共享節(jié)點?共享鎖是可以多個線程共有的,當(dāng)一個節(jié)點的線程獲取共享鎖后,必然要通知后繼共享節(jié)點的線程也可以獲取鎖了,這樣就不會讓其他等待的線程等很久,這種向后通知(傳播)的目的也是盡快通知其他等待的線程盡快獲取鎖。 -
waitStatus為0
waitStatus為0時,表示當(dāng)前節(jié)點處于初始狀態(tài)。Node節(jié)點的waitStatus狀態(tài)為以上5種狀態(tài)的一種。
thread 成員
Node的thread成員用來存放進入AQS隊列中的線程引用;Node的nextWaiter成員用來指向自己的后繼等待節(jié)點,此成員只有線程處于條件等待隊列中的時候使用。
搶占類型常量標(biāo)識
Node節(jié)點還定義了兩個搶占類型常量標(biāo)識:SHARED和EXCLUSIVE,具體的代碼如下:
static final class Node {
//標(biāo)識節(jié)點在搶占共享鎖
static final Node SHARED = new Node();
//標(biāo)識節(jié)點在搶占獨占鎖
static final Node EXCLUSIVE = null;
}
SHARED表示線程是因為獲取共享資源時阻塞而被添加到隊列中的;EXCLUSIVE表示線程因為獲取獨占資源時阻塞而被添加到隊列中的。
FIFO 雙向同步隊列
AQS的內(nèi)部隊列是CLH隊列的變種,每當(dāng)線程通過AQS獲取鎖失敗時,線程將被封裝成一個Node節(jié)點,通過CAS原子操作插入隊列尾部。當(dāng)有線程釋放鎖時,AQS會嘗試讓隊首的后驅(qū)節(jié)點占用鎖。
AQS是一個通過內(nèi)置的FIFO雙向隊列來完成線程的排隊工作,內(nèi)部通過節(jié)點head和tail記錄隊首和隊尾元素,元素的節(jié)點類型為Node類型,具體的代碼如下:
/*首節(jié)點的引用*/
private transient volatile Node head;
/*尾節(jié)點的引用*/
private transient volatile Node tail;
AQS的隊首節(jié)點和隊尾節(jié)點都是懶加載的。懶加載的意思是在需要的時候才真正創(chuàng)建。只有在線程競爭失敗的情況下,有新線程加入同步隊列時,AQS才創(chuàng)建一個head節(jié)點。head節(jié)點只能被setHead()方法修改,并且節(jié)點的waitStatus不能為CANCELLED。隊尾節(jié)點只在有新線程阻塞時才被創(chuàng)建。一個包含5個節(jié)點的AQS同步隊列的基本結(jié)構(gòu)如圖所示:
JUC 顯式鎖與 AQS 的關(guān)系
AQS是java.util.concurrent包的一個同步器,它實現(xiàn)了鎖的基本抽象功能,支持獨占鎖與共享鎖兩種方式。該類使用模板模式來實現(xiàn)的,成為構(gòu)建鎖和同步器的框架,使用該類可以簡單且高效地構(gòu)造出應(yīng)用廣泛的同步器(或者等待隊列)。
java.util.concurrent.locks包中的顯式鎖如ReentrantLock、ReentrantReadWriteLock,線程同步工具如Semaphore,異步回調(diào)工具如FutureTask等,內(nèi)部都使用了AQS作為等待隊列。通過開發(fā)工具進行AQS的子類導(dǎo)航會發(fā)現(xiàn)大量的AQS子類以內(nèi)部類的形式使用。同樣,我們也能繼承AQS類去實現(xiàn)自己需求的同步器(或鎖)。
ReentrantLock 與 AQS 的組合關(guān)系
ReentrantLock是一個可重入的互斥鎖,又稱為“可重入獨占鎖”。ReentrantLock鎖在同一個時間點只能被一個線程鎖持有,而可重入的意思是,ReentrantLock鎖可以被單個線程多次獲取。經(jīng)過觀察,ReentrantLock把所有Lock接口的操作都委派到一個Sync類上,該類繼承了AbstractQueuedSynchronizer:
static abstract class Sync extends AbstractQueuedSynchronizer
ReentrantLock為了支持公平鎖和非公平鎖兩種模式,為Sync又定義了兩個子類,具體如下:
final static class NonfairSync extends Sync {...}
final static class FairSync extends Sync
NonfairSync為非公平(或者不公平)同步器,F(xiàn)airSync為公平同步器。ReentrantLock提供了兩個構(gòu)造器,具體如下:
public ReentrantLock() {
//默認(rèn)的構(gòu)造器
sync = new NonfairSync();
//內(nèi)部使用非公平同步器
}
public ReentrantLock(boolean fair) {
//true 為公平鎖,否則為非公平鎖
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock的默認(rèn)構(gòu)造器(無參數(shù)構(gòu)造器)被初始化為一個NonfairSync對象,即使用非公平同步器,所以,默認(rèn)情況下ReentrantLock為非公平鎖。帶參數(shù)的構(gòu)造器可以根據(jù)fair參數(shù)的值具體指定ReentrantLock的內(nèi)部同步器使用FairSync還是NonfairSync。
由ReentrantLock的lock()和unlock()的源碼可以看到,它們只是分別調(diào)用了sync對象的lock()和release()方法。
public void lock() {
//搶占顯式鎖
sync.lock();
}
public void unlock() {
//釋放顯式鎖
sync.release(1);
}
通過以上的委托代碼可以看出,ReentrantLock的顯式鎖操作是委托(或委派)給一個Sync內(nèi)部類的實例完成的。而Sync內(nèi)部類只是AQS的一個子類,所以本質(zhì)上ReentrantLock的顯式鎖操作是委托(或委派)給AQS完成的。一個ReentrantLock對象的內(nèi)部一定有一個AQS類型的組合實例,二者之間是組合關(guān)系。
組合和聚合比較類似,二者都表示整體和部分之間的關(guān)系。
聚合關(guān)系的特點是:整體由部分構(gòu)成,但是整體和部分之間并不是強依賴的關(guān)系,而是弱依賴的關(guān)系,也就是說,即使整體不存在了,部分仍然存在。例如一個部門由多個員工組成,如果部門撤銷了,人員不會消失,人員依然存在。
組合關(guān)系的特點是:整體由部分構(gòu)成,但是整體和部分之間是強依賴的關(guān)系,如果整體不存在了,部分也隨之消失。例如一個公司由多個部門組成,如果公司不存在了,部門也將不存在??梢哉f,組合關(guān)系是一種強依賴的、特殊的聚合關(guān)系。
由于顯式鎖與AQS之間是一種強依賴的聚合關(guān)系,如果顯式鎖的實例銷毀,其聚合的AQS子類實例也被銷毀,因此顯式鎖與AQS之間是組合關(guān)系。
AQS 中的模板模式
AQS同步器是基于模板模式設(shè)計的,并且是模板模式經(jīng)典的一個運用,模板模式是很容易理解的設(shè)計模式之一。如果需要自定義同步器,一般的方法是繼承AQS,并重寫指定方法(鉤子方法),按照自己定義的規(guī)則對state(鎖的狀態(tài)信息)進行獲取與釋放;將AQS組合在自定義同步組件的實現(xiàn)中,自定義同步器去調(diào)用AQS的模板方法,而這些模板方法會調(diào)用重寫的鉤子方法。
模板模式
模板模式是類的行為模式。準(zhǔn)備一個抽象類,將部分邏輯以具體方法的形式實現(xiàn),然后聲明一些抽象方法來迫使子類實現(xiàn)剩余的邏輯。不同的子類提供不同的方式實現(xiàn)這些抽象方法,從而對剩余的邏輯有不同的實現(xiàn)。模板模式的關(guān)鍵在于:父類提供框架性的公共邏輯,子類提供個性化的定制邏輯。
模板模式的定義
在模板模式中,由抽象類定義模板方法和鉤子方法,模板方法定義一套業(yè)務(wù)算法框架,算法框架中的某些步驟由鉤子方法負(fù)責(zé)完成。具體的子類可以按需要重寫鉤子方法。模板方法的調(diào)用將通過抽象類的實例來完成。模板模式所包含的角色有抽象類和具體類,二者之間的關(guān)系如圖所示:
模板方法和鉤子方法
模板方法(Template Method)也常常被稱為骨架方法,主要定義了整個方法需要實現(xiàn)的業(yè)務(wù)操作的算法框架。其中,調(diào)用不同方法的順序因人而異,而且這個方法也可以做成一個抽象方法,要求子類自行定義邏輯流程。
鉤子方法(Hook Method)是被模板方法的算法框架所調(diào)用的,而由子類提供具體的實現(xiàn)方法。在抽象父類中,鉤子方法常被定為一個空方法或者抽象方法,需要由子類去實現(xiàn)。鉤子方法的存在可以讓子類提供算法框架中的某個細(xì)分操作,從而讓子類實現(xiàn)算法中可選的、需要變動的部分。
模板模式的優(yōu)點
分離變與不變是軟件設(shè)計的一個基本原則。模板模式將不變的部分封裝在基類的骨架方法中,而將變化的部分通過鉤子方法進行封裝,交給子類去提供具體的實現(xiàn),在一定程度上優(yōu)美地闡述了“分離變與不變”這一軟件設(shè)計原則。
模板模式的優(yōu)點如下:
- 通過算法骨架最大程度地進行了代碼復(fù)用,減少重復(fù)代碼。
- 模板模式提取了公共部分代碼,便于統(tǒng)一維護。
- 鉤子方法是由子類實現(xiàn)的,因此子類可以通過拓展增加復(fù)雜的功能,符合開放封閉原則。
開放封閉原則是面向?qū)ο笤O(shè)計的五大原則之一,其核心思想是:對擴展開放,對修改關(guān)閉。面向?qū)ο笤O(shè)計的五大原則:單一職責(zé)原則、依賴倒置原則、接口隔離原則、里氏替換原則和開放封閉原則。
AQS 的模板流程
AQS定義了兩種資源共享方式:
- Exclusive(獨享鎖):只有一個線程能占有鎖資源,如ReentrantLock。獨享鎖又可分為公平鎖和非公平鎖。
- Share(共享鎖):多個線程可同時占有鎖資源,如Semaphore、CountDownLatch、CyclicBarrier、ReadWriteLock的Read鎖。
AQS為不同的資源共享方式提供了不同的模板流程,包括共享鎖、獨享鎖模板流程。這些模板流程完成了具體線程進出等待隊列的基礎(chǔ)(如獲取資源失敗入隊/喚醒出隊等)、通用邏輯?;诨A(chǔ)、通用邏輯,AQS提供一種實現(xiàn)阻塞鎖和依賴FIFO等待隊列的同步器的框架,AQS模板為ReentrantLock、CountDownLatch、Semaphore提供了優(yōu)秀的解決方案。
自定義的同步器只需要實現(xiàn)共享資源state的獲取與釋放方式即可,這些邏輯都編寫在鉤子方法中。無論是共享鎖還是獨享鎖,AQS在執(zhí)行模板流程時會回調(diào)自定義的鉤子方法。
AQS 中的鉤子方法
自定義同步器時,AQS中需要重寫的鉤子方法大致如下:
- tryAcquire(int):獨占鎖鉤子,嘗試獲取資源。若成功則返回true,若失敗則返回false。
- tryRelease(int):獨占鎖鉤子,嘗試釋放資源。若成功則返回true,若失敗則返回false。
- tryAcquireShared(int):共享鎖鉤子,嘗試獲取資源,負(fù)數(shù)表示失?。?表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
- tryReleaseShared(int):共享鎖鉤子,嘗試釋放資源。若成功則返回true,若失敗則返回false。
- isHeldExclusively():獨占鎖鉤子,判斷該線程是否正在獨占資源。只有用到condition條件隊列時才需要去實現(xiàn)它。
以上鉤子方法的默認(rèn)實現(xiàn)會拋出UnsupportedOperationException異常。除了這些鉤子方法外,AQS類中的其他方法都是final類型的方法,所以無法被其他類繼承,只有這幾個方法可以被其他類繼承。
對鉤子方法的具體介紹如下:
-
tryAcquire 獨占式獲取鎖
顧名思義,就是嘗試獲取鎖,AQS在這里沒有對tryAcquire()進行功能的實現(xiàn),只有一個拋出異常的語句,我們需要自己對其進行實現(xiàn),可以對其重寫實現(xiàn)公平鎖、不公平鎖、可重入鎖、不可重入鎖。protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
-
tryRelease 獨占式釋放鎖
tryRelease嘗試釋放獨占鎖,需要子類來實現(xiàn)。
protected boolean tryRelease(long arg) {
throw new UnsupportedOperationException();
}
- tryAcquireShared 共享式獲取
tryAcquireShared嘗試進行共享鎖的獲得,需要子類來實現(xiàn)。
protected long tryAcquireShared(long arg) {
throw new UnsupportedOperationException();
}
- tryReleaseShared 共享式釋放
tryReleaseShared嘗試進行共享鎖的釋放,需要子類來實現(xiàn)。
protected boolean tryReleaseShared(long arg) {
throw new UnsupportedOperationException();
}
- 查詢是否處于獨占模式
isHeldExclusively的功能是查詢線程是否正在獨占資源。在獨占鎖的條件隊列中用到。
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
通過 AQS 實現(xiàn)一把簡單的獨占鎖
由于ReentrantLock的實現(xiàn)比較復(fù)雜,為了降低學(xué)習(xí)難度,首先模擬ReentrantLock的源碼并且基于AQS實現(xiàn)一把非常簡單的獨占鎖:
基于AQS實現(xiàn)一把非常簡單的獨占鎖的類為SimpleMockLock,它的UML類圖如圖所示:
其中Lock是鎖的接口,定義了鎖的操作的抽象方法,具體實現(xiàn)由子類實現(xiàn),而其實現(xiàn)類大多都使用AQS作為其實現(xiàn)的等待隊列(線程等待獲取鎖的隊列),所以可以說AQS是對鎖真正的實現(xiàn)。
SimpleMockLock是一個基于AQS的、簡單的非公平獨占鎖實現(xiàn),代碼如下:
public class SimpleMockLock implements Lock {
//同步器實例
private final Sync sync = new Sync();
// 自定義的內(nèi)部類:同步器
// 直接使用 AbstractQueuedSynchronizer.state 值表示鎖的狀態(tài)
// AbstractQueuedSynchronizer.state=1 表示鎖沒有被占用
// AbstractQueuedSynchronizer.state=0 表示鎖已經(jīng)被占用
private static class Sync extends AbstractQueuedSynchronizer {
//鉤子方法
protected boolean tryAcquire(int arg) {
//CAS更新狀態(tài)值為1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//鉤子方法
protected boolean tryRelease(int arg) {
//如果當(dāng)前線程不是占用鎖的線程
if (Thread.currentThread() != getExclusiveOwnerThread()) {
//拋出非法狀態(tài)的異常
throw new IllegalMonitorStateException();
}
//如果鎖的狀態(tài)為沒有占用
if (getState() == 0) {
//拋出非法狀態(tài)的異常
throw new IllegalMonitorStateException();
}
//接下來不需要使用CAS操作,因為下面的操作不存在并發(fā)場景
setExclusiveOwnerThread(null);
//設(shè)置狀態(tài)
setState(0);
return true;
}
//顯式鎖的搶占方法
@Override
public void lock() {
//委托給同步器的acquire()搶占方法
sync.acquire(1);
}
//顯式鎖的釋放方法
@Override
public void unlock() {
//委托給同步器的release()釋放方法
sync.release(1);
}
//省略其他未實現(xiàn)的方法
}
}
和ReentrantLock相比,SimpleMockLock的代碼非常簡單,這也是為了大家不被ReentrantLock的復(fù)雜代碼鎖困擾,能去更好地聚焦于AQS原理的學(xué)習(xí)。
SimpleMockLock僅僅實現(xiàn)了Lock接口的以下兩種方法:
- lock()方法:完成顯式鎖的搶占。
- unlock()方法:完成顯式鎖的釋放。
SimpleMockLock的鎖搶占和鎖釋放是委托給Sync實例的acquire()方法和release()方法完成的。SimpleMockLock的內(nèi)部類Sync繼承了AQS類,實際上acquire()、release()是AQS的兩個模板方法。在搶占鎖時,AQS的模板方法acquire()會調(diào)用tryAcquire(int arg)鉤子方法;在釋放鎖時,AQS的模板方法release()會調(diào)用tryRelease(int arg)鉤子方法。
內(nèi)部類Sync繼承AQS類時提供了以下兩個鉤子方法的實現(xiàn):
- protected boolean tryAcquire(int arg):搶占鎖的鉤子實現(xiàn)。此方法將鎖的狀態(tài)設(shè)置為1,表示互斥鎖已經(jīng)被占用,并保存當(dāng)前線程。
- protected boolean tryRelease(int arg):釋放鎖的鉤子實現(xiàn)。此方法將鎖的狀態(tài)設(shè)置為0,表示互斥鎖已經(jīng)被釋放。
AQS 鎖搶占的原理
下面基于SimpleMockLock公平獨占鎖的搶占過程詳細(xì)說明AQS鎖搶占的原理。
顯式鎖搶占的總體流程
這里先介紹一下SimpleMockLock鎖搶占的總體流程,具體如圖所示:
流程的第一步,顯式鎖的lock()方法會去調(diào)用同步器基類AQS的模板方法acquire(arg)。
AQS 模板方法:acquire(arg)
acquire是AQS封裝好的獲取資源的公共入口,它是AQS提供的利用獨占方式獲取資源的方法,源碼實現(xiàn)如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
}
通過源碼可以發(fā)現(xiàn),acquire(arg)至少執(zhí)行一次tryAcquire(arg)鉤子方法。tryAcquire(arg)方法默認(rèn)是拋出一個異常,具體的獲取獨占資源state的邏輯需要鉤子方法去實現(xiàn)。在模板方法acquire中,若調(diào)用tryAcquire(arg)嘗試成功,則acquire()將直接返回,表示已經(jīng)搶到鎖;若不成功,則將線程加入等待隊列。
鉤子實現(xiàn):tryAcquire(arg)
SimpleMockLock的tryAcquire()的流程是:CAS操作state字段,將其值從0改為1,若成功,則表示鎖未被占用,可成功占用,并且返回true;若失敗,則獲取鎖失敗,返回false。
SimpleMockLock的實現(xiàn)非常簡單,是不可以重入的,僅僅為了學(xué)習(xí)AQS而編寫。如果是可以重入的鎖,在重復(fù)搶鎖時會累計state字段值,表示重入鎖的次數(shù),具體可參考ReentrantLock源碼。
直接入隊:addWaiter
在acquire模板方法中,如果鉤子方法tryAcquire嘗試獲取同步狀態(tài)失敗,則構(gòu)造同步節(jié)點(獨占式節(jié)點模式為Node.EXCLUSIVE),通過addWaiter(Node node, int args)方法將該節(jié)點加入到同步隊列的隊尾。
private Node addWaiter(Node mode) {
//創(chuàng)建新節(jié)點
Node node = new Node(Thread.currentThread(), mode);
// 加入隊列尾部,將目前的隊列tail作為自己的前驅(qū)節(jié)點pred
Node pred = tail;
// 隊列不為空的時候
if (pred != null) {
node.prev = pred;
// 先嘗試通過CAS方式修改尾節(jié)點為最新的節(jié)點
// 如果修改成功,將節(jié)點加入到隊列的尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//第一次嘗試添加尾部失敗,意味著有并發(fā)搶鎖發(fā)生,需要進行自旋
enq(node);
return node;
}
在addWaiter()方法中,首先需要構(gòu)造一個Node對象,具體的代碼如下:
Node node = new Node(Thread.currentThread(), mode);
構(gòu)造Node對象所用到的兩個參數(shù)如下:
-
當(dāng)前線程
構(gòu)造Node對象時,將通過Thread.currentThread()獲取到當(dāng)前線程作為第一個參數(shù),該線程會被賦值給Node對象的thread成員屬性,相當(dāng)于將線程與Node節(jié)點進行綁定。在后續(xù)輪到此Node節(jié)點去占用鎖時,就需要其thread屬性獲得需要喚醒的線程。 -
Node共享類型
mode是一個表示Node類型的參數(shù),用于標(biāo)識新節(jié)點是獨占地還是共享地去搶占鎖。mode雖然為Node類型,但是僅僅起到類型標(biāo)識的作用。mode可能的值有兩個,以常量的形式定義在Node類中,具體的代碼如下:static final class Node { /** 常量標(biāo)識:標(biāo)識當(dāng)前的隊列節(jié)點類型為共享型搶占 */ static final Node SHARED = new Node(); /** 常量標(biāo)識:標(biāo)識當(dāng)前的隊列節(jié)點類型為獨占型搶占 */ static final Node EXCLUSIVE = null; //省略其他代碼 }
如果搶占獨占鎖,那么mode值為EXCLUSIVE;如果搶占共享鎖,那么mode值為SHARED。
自旋入隊:enq
addWaiter()第一次嘗試在尾部添加節(jié)點失敗,意味著有并發(fā)搶鎖發(fā)生,需要進行自旋。enq()方法通過CAS自旋將節(jié)點的添加到隊列尾部。
/**
* 這里進行了循環(huán),如果此時存在tail,就執(zhí)行添加隊尾的操作
* 如果依然不存在,就把當(dāng)前線程作為head節(jié)點
* 插入節(jié)點后,調(diào)用acquireQueued()進行阻塞
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//隊列為空,初始化隊尾節(jié)點和隊首節(jié)點為新節(jié)點
if (compareAndSetHead(new Node())) tail = head;
} else {
// 隊列不為空,將新節(jié)點插入隊列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**CAS操作head指針,僅僅被enq()調(diào)用
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**CAS操作tail指針,僅僅被enq()調(diào)用
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
自旋搶占:acquireQueued()
在節(jié)點入隊之后(tryAcquire方法沒有拿到鎖時),啟動自旋搶鎖的流程。acquireQueued()方法的主要邏輯:當(dāng)前Node節(jié)點線程在死循環(huán)中不斷獲取同步狀態(tài),并且不斷在前驅(qū)節(jié)點上自旋,只有當(dāng)前驅(qū)節(jié)點是隊首節(jié)點才能嘗試獲取鎖,原因是:
- 隊首節(jié)點是成功獲取同步狀態(tài)(鎖)的節(jié)點,而隊首節(jié)點的線程釋放了同步狀態(tài)以后,將會喚醒其后驅(qū)節(jié)點,后驅(qū)節(jié)點的線程被喚醒后要檢查自己的前驅(qū)節(jié)點是否為隊首節(jié)點。
- 維護同步隊列的FIFO原則,節(jié)點進入同步隊列之后,就進入了一個自旋的過程,每個節(jié)點都在不斷地執(zhí)行for死循環(huán)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋檢查當(dāng)前節(jié)點的前驅(qū)節(jié)點是否為隊首節(jié)點,是才能獲取鎖
for (;;) {
// 獲取節(jié)點的前驅(qū)節(jié)點
final Node p = node.predecessor();
// 節(jié)點中的線程循環(huán)的檢查自己的前驅(qū)節(jié)點是否為head節(jié)點
// 只有前驅(qū)節(jié)點是head時,進一步調(diào)用子類的tryAcquire(…)實現(xiàn)
if (p == head && tryAcquire(arg)) {
// tryAcquire成功后,將當(dāng)前節(jié)點設(shè)置為隊首節(jié)點,移除之前的隊首節(jié)點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 檢查前一個節(jié)點的狀態(tài),預(yù)判當(dāng)前獲取鎖失敗的線程是否要掛起
// 如果需要掛起,調(diào)用parkAndCheckInterrupt方法掛起當(dāng)前線程,直到被喚醒
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; // 若兩個操作都是true,則為true
}
} finally {
//如果等待過程中沒有成功獲取資源(如timeout,或者可中斷的情況下被中斷了)那么取消節(jié)點在隊列中的等待
if (failed)
//取消請求,將當(dāng)前節(jié)點從隊列中移除
cancelAcquire(node);
}
}
為了不浪費資源,acquireQueued()自旋過程中會阻塞線程,等待前驅(qū)節(jié)點喚醒后才啟動循環(huán)。如果成功就返回,否則執(zhí)行shouldParkAfterFailedAcquire()、parkAndCheckInterrupt()來達到阻塞效果。調(diào)用acquireQueued()方法的線程一定是node所綁定的線程(由它的thread屬性所引用),該線程也是最開始調(diào)用lock()方法搶鎖的那個線程,在acquireQueued()的死循環(huán)中,該線程可能重復(fù)進行阻塞和被喚醒。
AQS隊列上每一個節(jié)點所綁定的線程在搶鎖過程中都會自旋,即執(zhí)行acquireQueued()方法的死循環(huán),也就是說,AQS隊列上每個節(jié)點的線程都不斷自旋,具體如圖所示:
如果隊首節(jié)點獲取了鎖,那么該節(jié)點綁定的線程會終止acquireQueued()自旋,線程會去執(zhí)行臨界區(qū)代碼。此時,其余的節(jié)點處于自旋狀態(tài),處于自旋狀態(tài)的線程當(dāng)然也不會執(zhí)行無效的空循環(huán)而導(dǎo)致CPU資源浪費,而是被掛起(Park)進入阻塞狀態(tài)。AQS隊列的節(jié)點自旋不像CLH節(jié)點那樣在空自旋而耗費資源。
掛起預(yù)判:shouldParkAfterFailedAcquire
acquireQueued()自旋在阻塞自己的線程之前會進行掛起預(yù)判。shouldParkAfterFailedAcquire()方法的主要功能是:找到當(dāng)前節(jié)點的有效前驅(qū)節(jié)點(是指有效節(jié)點不是CANCELLED類型的節(jié)點),并且將有效前驅(qū)節(jié)點的狀態(tài)設(shè)置為SIGNAL,之后返回true代表當(dāng)前線程可以馬上被阻塞了。具體可以分為三種情況:
- 如果前驅(qū)節(jié)點的狀態(tài)為?1(SIGNAL),說明前驅(qū)的等待標(biāo)志已設(shè)好,返回true表示設(shè)置完畢。
- 如果前驅(qū)節(jié)點的狀態(tài)為1(CANCELLED),說明前驅(qū)節(jié)點本身不再等待了,需要跨越這些節(jié)點,然后找到一個有效節(jié)點,再把當(dāng)前節(jié)點和這個有效節(jié)點的喚醒關(guān)系建立好:調(diào)整前驅(qū)節(jié)點的next指針為自己。
- 如果是其他情況:?3(PROPAGATE、共享鎖等待)、?2(CONDITION、條件等待)、0(初始狀態(tài)),那么通過CAS嘗試設(shè)置前驅(qū)節(jié)點為SIGNAL,表示只要前驅(qū)釋放鎖(就會通知其后繼節(jié)點),當(dāng)前節(jié)點就可以搶占鎖了。
其源碼如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 獲得前驅(qū)節(jié)點的狀態(tài):如果前驅(qū)節(jié)點狀態(tài)為SIGNAL(值為-1)就直接返回
if (ws == Node.SIGNAL) return true;
if (ws > 0) {
// 前驅(qū)節(jié)點以及取消CANCELLED(1)
do {
// 不斷地循環(huán),找到有效前驅(qū)節(jié)點,即非CANCELLED(值為1)類型節(jié)點
//將pred記錄前驅(qū)的前驅(qū)
pred = pred.prev;
//調(diào)整當(dāng)前節(jié)點的prev指針,保持為前驅(qū)的前驅(qū)
node.prev = pred;
} while (pred.waitStatus > 0);
//調(diào)整前驅(qū)節(jié)點的next指針
pred.next = node;
} else {
//如果前驅(qū)狀態(tài)不是CANCELLED,也不是SIGNAL,就設(shè)置為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//設(shè)置前驅(qū)狀態(tài)之后,此方法返回值還是為false,表示線程不可用,第二次調(diào)用時被阻塞
return false;
}
在獨占鎖的場景中,此方法shouldParkAfterFailedAcquire()是在acquireQueued()方法的死循環(huán)中被調(diào)用的,由于此方法返回false時acquireQueued()不會阻塞當(dāng)前線程,只有此方法返回true時當(dāng)前線程才阻塞。因此在一般情況下,此方法至少需執(zhí)行兩次,當(dāng)前線程才會被阻塞。
在第一次進入此方法時,首先會進入后一個if判斷的else分支,通過CAS設(shè)置pred前驅(qū)的waitStatus為SIGNAL,然后返回false。此方法返回false之后,獲取獨占鎖的acquireQueued()方法會繼續(xù)進行for循環(huán)去搶鎖:
- 假設(shè)node的前驅(qū)節(jié)點是隊首節(jié)點,tryAcquire()搶鎖成功,則獲取到鎖。
- 假設(shè)node的前驅(qū)節(jié)點仍然不是隊首節(jié)點,或tryAcquire()搶鎖失敗,仍會再次調(diào)用此方法。
第二次進入此方法時,由于上一次進入時已經(jīng)將pred.waitStatus設(shè)置為-1(SIGNAL)了,因此這次會進入第一個判斷條件,直接返回true,表示應(yīng)該調(diào)用parkAndCheckInterrupt阻塞當(dāng)前線程了,等待前一個節(jié)點執(zhí)行完成之后喚醒。
waitStatus 等于?3
什么時候遇到前驅(qū)節(jié)點狀態(tài)waitStatus等于?3(PROPAGATE)的場景呢? PROPAGATE只能在使用共享鎖的時候出現(xiàn),并且只可能設(shè)置在head上。所以,對于非隊尾節(jié)點,如果它的狀態(tài)為0或PROPAGATE,那么它肯定是head。當(dāng)?shù)却犃兄杏卸鄠€節(jié)點時,如果head的狀態(tài)為0或PROPAGATE,說明head處于一種中間狀態(tài),且此時有線程剛才釋放鎖了。而對于搶鎖線程來說,如果檢測到這種狀態(tài),說明再次執(zhí)行acquire()是極有可能獲得鎖的。
waitStatus 大于 0
什么時候會遇到前驅(qū)節(jié)點的狀態(tài)waitStatus大于0的場景呢?當(dāng)pred前驅(qū)節(jié)點的搶鎖請求被取消后期狀態(tài)為CANCELLED(值為1)時,當(dāng)前節(jié)點(如果被喚醒)就會循環(huán)移除所有被取消的前驅(qū)節(jié)點,直到找到未被取消的前驅(qū)。在移除所有被取消的前驅(qū)節(jié)點后,此方法將返回false,再一次去執(zhí)行acquireQueued()的自旋搶占。
waitStatus 等于 0
什么時候遇到前驅(qū)節(jié)點狀態(tài)waitStatus等于0(初始狀態(tài))的場景呢?分為兩種情況:
- node節(jié)點剛成為新隊尾,但還沒有將舊隊尾的狀態(tài)設(shè)置為SIGNAL。
- node節(jié)點的前驅(qū)節(jié)點為head。
前驅(qū)節(jié)點為waitStatus等于0的情況是最常見的。比如現(xiàn)在AQS的等待隊列中有很多節(jié)點正在等待,當(dāng)前線程剛執(zhí)行完畢addWaiter(節(jié)點剛成為新隊尾),然后開始執(zhí)行獲取鎖的死循環(huán)(獨占鎖對應(yīng)的是acquireQueued()里的死循環(huán),共享鎖對應(yīng)的是doAcquireShared()里的死循環(huán)),此時節(jié)點的前驅(qū)(也就是舊隊尾的狀態(tài))肯定還是0(也就是默認(rèn)初始化的值),然后死循環(huán)執(zhí)行兩次,第一次執(zhí)行shouldParkAfterFailedAcquire()自然會檢測到前驅(qū)狀態(tài)為0,然后將0設(shè)置為SIGNAL;第二次執(zhí)行shouldParkAfterFailedAcquire(),由于前驅(qū)節(jié)點為SIGNAL,當(dāng)前線程直接返回true,去執(zhí)行自我阻塞。
線程掛起:parkAndCheckInterrupt()
parkAndCheckInterrupt()主要任務(wù)是暫停當(dāng)前線程,具體如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 調(diào)用park()使線程進入waiting狀態(tài)
return Thread.interrupted();
// 如果被喚醒,查看自己是否已經(jīng)被中斷
}
AbstractQueuedSynchronizer會把所有的等待線程構(gòu)成一個阻塞等待隊列,當(dāng)一個線程執(zhí)行完lock.unlock()時,會激活其后驅(qū)節(jié)點,通過調(diào)用LockSupport.unpark(postThread)完成后繼線程的喚醒。
AQS 兩個關(guān)鍵點:節(jié)點的入隊和出隊
由于AQS的實現(xiàn)非常精妙,因此理解AQS的原理還是比較困難的。理解AQS的原理一個比較重要的關(guān)鍵點在于掌握節(jié)點的入隊和出隊。
節(jié)點的自旋入隊
節(jié)點在第一次入隊失敗后,就會開始自旋入隊,分為以下兩種情況:
- 如果AQS的隊列非空,新節(jié)點通過CAS插入隊列尾部,并且是通過CAS方式插入,插入之后AQS的tail將指向新的尾節(jié)點。
- 如果AQS的隊列為空,新節(jié)點入隊時,AQS通過CAS方法將新節(jié)點設(shè)置為隊首節(jié)點,并且將tail指針指向新節(jié)點。然后自旋,進入CAS插入操作,直到插入成功,自旋才結(jié)束。
節(jié)點的入隊的代碼在enq()方法中,因為enq()非常重要,所以將其代碼重復(fù)如下:
private Node enq(final Node node) {
for (;;) { //自旋入隊
Node t = tail;
if (t == null) {
//隊列為空,初始化隊尾節(jié)點和隊首節(jié)點為新節(jié)點
if (compareAndSetHead(new Node()))
tail = head;
} else {
//如果隊列不為空,將新節(jié)點插入隊列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
隊列初始化創(chuàng)建了一個空的隊首節(jié)點,這個空的隊首節(jié)點沒有對應(yīng)的線程,只占用一個位置,等到后面的節(jié)點搶到鎖,這個節(jié)點就被移除。
節(jié)點的出隊
節(jié)點出隊的算法在acquireQueued()方法中,這是一個非常重要的模板方法。acquireQueued()方法通過不斷在前驅(qū)節(jié)點上自旋(for死循環(huán)),如果前驅(qū)節(jié)點是隊首節(jié)點并且當(dāng)前線程使用鉤子方法tryAcquire(arg)獲得了鎖,則移除隊首節(jié)點,將當(dāng)前節(jié)點設(shè)置為隊首節(jié)點。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 在前驅(qū)節(jié)點上自旋
for (;;) {
// 獲取節(jié)點的前驅(qū)節(jié)點
final Node p = node.predecessor();
// (1)前驅(qū)節(jié)點是隊首節(jié)點
// (2)通過子類的tryAcquire()鉤子實現(xiàn)搶占成功
if (p == head && tryAcquire(arg)) {
// 將當(dāng)前節(jié)點設(shè)置為隊首節(jié)點,之前的隊首節(jié)點出隊
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 省略park(無限期阻塞)線程的代碼
}
} finally {
// 省略其他
}
}
節(jié)點加入到隊列尾部后,如果其前驅(qū)節(jié)點就不是隊首節(jié)點,通常情況下,該新節(jié)點所綁定的線程會被無限期阻塞,而不會去執(zhí)行無效循環(huán),從而導(dǎo)致CPU資源的浪費。
問題來了:被無限期阻塞的搶鎖線程,是什么時候被喚醒的呢?
對于公平鎖而言,隊首節(jié)點就是占用鎖的節(jié)點,在釋放鎖時,將會喚醒其后驅(qū)節(jié)點所綁定的線程。后驅(qū)節(jié)點的線程被喚醒后會重新執(zhí)行以上acquireQueued()的自旋(for死循環(huán))搶鎖邏輯,檢查自己的前驅(qū)節(jié)點是否為隊首節(jié)點,如果是,在搶鎖成功之后會移除舊的隊首節(jié)點。
**AQS釋放鎖時是如何喚醒后繼線程的呢?**AQS釋放鎖的核心代碼如下:
public final boolean release(long arg) {
if (tryRelease(arg)) {
// 釋放鎖的鉤子實現(xiàn)
Node h = head; //隊列的隊首節(jié)點
if (h != null && h.waitStatus != 0) unparkSuccessor(h);
//喚醒后驅(qū)線程
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 省略不相關(guān)代碼
Node s = node.next; //后驅(qū)節(jié)點
// 省略不相關(guān)代碼
if (s != null) LockSupport.unpark(s.thread);
//喚醒后驅(qū)的線程
}
通過以上分析可以看出:無效節(jié)點的出隊操作是在喚醒后驅(qū)節(jié)點的線程之后,其后驅(qū)節(jié)點的線程在搶鎖過程中完成的。
AQS 鎖釋放的原理
下面基于SimpleMockLock公平獨占鎖的釋放過程詳細(xì)說明AQS鎖釋放的原理。
SimpleMockLock 獨占鎖的釋放流程
SimpleMockLock獨占鎖的釋放流程如圖所示:
AQS 模板方法:release()
SimpleMockLock的unlock()方法被調(diào)用時,會調(diào)用AQS的release(…)的模板方法。AQS的release(…)的模板方法代碼如下:
public final boolean release(long arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
這段代碼邏輯比較簡單,如果同步狀態(tài)的鉤子方法執(zhí)行成功(tryRelease返回true),就會執(zhí)行if塊中的代碼,當(dāng) head 指向的隊首節(jié)點不為null ,并且該節(jié)點的狀態(tài)值不為0時才會執(zhí)行unparkSuccessor()方法。
鉤子方法tryRelease()方法嘗試釋放當(dāng)前線程持有的資源,由子類提供具體的實現(xiàn)。
鉤子實現(xiàn):tryRelease()
tryRelease()方法是需要子類提供實現(xiàn)的一個鉤子方法,需要子類根據(jù)具體業(yè)務(wù)去實現(xiàn)。SimpleMockLock的鉤子實現(xiàn)如下:
//鉤子方法
protected boolean tryRelease(int arg) {
//如果當(dāng)前線程不是占用鎖的線程
if (Thread.currentThread() != getExclusiveOwnerThread()) {
//拋出非法狀態(tài)的異常
throw new IllegalMonitorStateException();
}
//如果鎖的狀態(tài)為沒有占用
if (getState() == 0) {
//拋出非法狀態(tài)的異常
throw new IllegalMonitorStateException();
}
//接下來不需要使用CAS操作,因為下面的操作不存在并發(fā)場景
setExclusiveOwnerThread(null);
//設(shè)置狀態(tài)
setState(0);
return true;
}
核心邏輯是設(shè)置同步狀態(tài)state的值為0,方便后驅(qū)節(jié)點執(zhí)行搶占。
喚醒后驅(qū):unparkSuccessor()
release()鉤子執(zhí)行了tryRelease()鉤子成功之后,使用unparkSuccessor()喚醒后驅(qū)節(jié)點,具體的代碼如下:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus; // 獲得節(jié)點狀態(tài),釋放鎖的節(jié)點,也就是隊首節(jié)點
//CANCELLED(1)、SIGNAL(-1)、CONDITION (-2)、PROPAGATE(-3)
//如果隊首節(jié)點狀態(tài)小于0,則將其置為0,表示初始狀態(tài)
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; // 找到后面的一個節(jié)點
if (s == null || s.waitStatus > 0) {
// 如果新節(jié)點已經(jīng)被取消CANCELLED(1)
s = null;
//從隊列尾部開始,往前去找最前面的一個waitStatus小于0的節(jié)點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) s = t;
}
//喚醒后驅(qū)節(jié)點對應(yīng)的線程
if (s != null)
LockSupport.unpark(s.thread);
}
unparkSuccessor()喚醒后驅(qū)節(jié)點的線程后,后驅(qū)節(jié)點的線程重新執(zhí)行方法acquireQueued()中的自旋搶占邏輯。
當(dāng)AQS隊首節(jié)點釋放鎖之后,隊首節(jié)點的狀態(tài)變成初始狀態(tài),此節(jié)點理論上需要從隊列中移除,但是此時該無效節(jié)點并沒有立即被移除,unparkSuccessor()方法并沒有立即從隊列中刪除該無效節(jié)點,僅僅喚醒了后驅(qū)節(jié)點的線程,重啟了后驅(qū)節(jié)點的自旋搶鎖。
ReentrantLock 的搶鎖流程
下 面 結(jié) 合AbstractQueuedSynchronizer()的 模 板 方 法 詳細(xì) 說 明 ReentrantLock 的 實 現(xiàn) 過 程 。ReentrantLock有兩種模式:
- 公平鎖:按照線程在隊列中的排隊順序,先到者先拿到鎖。
- 非公平鎖:當(dāng)線程要獲取鎖時,無視隊列順序直接去搶鎖,誰搶到就是誰的。
ReentrantLock在同一個時間點只能被一個線程獲取,ReentrantLock是通過一個FIFO的等待隊列(AQS隊列)來管理獲取該鎖所有線程的。ReentrantLock是繼承自Lock接口實現(xiàn)的獨占式可重入鎖,并且ReentrantLock組合一個AQS內(nèi)部實例完成同步操作。
ReentrantLock 非公平鎖的搶占流程
ReentrantLock非公平鎖的搶占的總體流程如圖所示:
非公平鎖的同步器子類
ReentrantLock為非公平鎖實現(xiàn)了一個內(nèi)部的同步器——NonfairSync,其顯式鎖獲取方法lock()的源碼如下:
static final class NonfairSync extends Sync {
//非公平鎖搶占
final void lock() {
if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());
else acquire(1);
}
//省略其他
}
首先用一個CAS操作,判斷state是否是0(表示當(dāng)前鎖未被占用),如果是0就把它置為1,并且設(shè)置當(dāng)前線程為該鎖的獨占線程,表示獲取鎖成功。當(dāng)多個線程同時嘗試占用同一個鎖時,CAS操作只能保證一個線程操作成功,剩下的只能乖乖去排隊。
ReentrantLock“非公平”性即體現(xiàn)在這里:如果占用鎖的線程剛釋放鎖,state置為0,而排隊等待鎖的線程還未喚醒,新來的線程就直接搶占了該鎖,那么就“插隊”了。舉一個例子:當(dāng)前有三個線程A、B、C去競爭鎖,假設(shè)線程A、B在排隊,但是后來的C直接進行CAS操作成功了,拿到了鎖開開心心地返回了,那么線程A、B只能乖乖看著。
非公平搶占的鉤子方法:tryAcquire(arg)
如果非公平搶占沒有成功,非公平鎖的lock會執(zhí)行模板方法acquire(),首先會調(diào)用到鉤子方法tryAcquire(arg)。非公平搶占的鉤子方法實現(xiàn)如下:
static final class NonfairSync extends Sync {
//非公平鎖搶占的鉤子方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
//省略其他
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 先直接獲得鎖的狀態(tài)
int c = getState();
if (c == 0) {
// 如果任務(wù)隊列首節(jié)點的線程完了,它會將鎖的state設(shè)置為0
// 當(dāng)前搶鎖線程的下一步就是直接進行搶占,不管不顧
// 發(fā)現(xiàn)state是空的,就直接拿來加鎖使用,根本不考慮后面后驅(qū)者的存在
if (compareAndSetState(0, acquires)) {
// 1. 利用CAS自旋方式判斷當(dāng)前state確實為0,然后設(shè)置成acquire(1)
// 這是原子性的操作,可以保證線程安全
setExclusiveOwnerThread(current);
// 設(shè)置當(dāng)前執(zhí)行的線程,直接返回true
return true;
}
} else if (current == getExclusiveOwnerThread()) {
// 2. 當(dāng)前的線程和執(zhí)行中的線程是同一個,也就意味著可重入操作
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
// 表示當(dāng)前鎖被1個線程重復(fù)獲取了nextc次
return true;
}
// 否則就是返回false,表示沒有嘗試成功獲取當(dāng)前鎖,進入排隊過程
return false;
}
//省略其他
}
非公平同步器ReentrantLock.NonfairSync的核心思想就是當(dāng)前進程嘗試獲取鎖的時候,如果發(fā)現(xiàn)鎖的狀態(tài)位是0,就直接嘗試將鎖拿過來,然后執(zhí)行setExclusiveOwnerThread(),根本不管同步隊列中的排隊節(jié)點。
ReentrantLock 公平鎖的搶占流程
ReentrantLock公平鎖的搶占流程如圖所示:
公平鎖的同步器子類
ReentrantLock為公平鎖實現(xiàn)了一個內(nèi)部的同步器——FairSync,其顯式鎖獲取方法lock的源碼如下:
static final class FairSync extends Sync {
//公平鎖搶占的鉤子方法
final void lock() {
acquire(1);
}
//省略其他
}
公平同步器ReentrantLock.FairSync的核心思想是通過AQS模板方法去進行隊列入隊操作。
公平搶占的鉤子方法:tryAcquire(arg)
公平鎖的lock會執(zhí)行模板方法acquire,該方法首先會調(diào)用鉤子方法tryAcquire(arg)。公平搶占的鉤子方法實現(xiàn)如下:
static final class FairSync extends Sync {
//公平搶占的鉤子方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//鎖狀態(tài)
if (c == 0) {
//有前驅(qū)節(jié)點就返回,足夠講義氣
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平搶占的鉤子方法中,首先判斷是否有后驅(qū)節(jié)點,如果有后驅(qū)節(jié)點,并且當(dāng)前線程不是鎖的占有線程,鉤子方法就返回false,模板方法會進入排隊的執(zhí)行流程,可見公平鎖是真正公平的。
是否有后驅(qū)節(jié)點的判斷
FairSync進行是否有后驅(qū)節(jié)點的判斷代碼如下:
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
hasQueuedPredecessors的執(zhí)行場景大致如下:
- 當(dāng)h!=t不成立的時候,說明h隊首節(jié)點、t尾節(jié)點要么是同一個節(jié)點,要么都是null,此時hasQueuedPredecessors()返回false,表示沒有后驅(qū)節(jié)點。
- 當(dāng)h!=t成立的時候,進一步檢查head.next是否為null,如果為null,就返回true。什么情況下h!=t同時h.next==null呢?有其他線程第一次正在入隊時可能會出現(xiàn)。其他線程執(zhí)行AQS的enq()方法,compareAndSetHead(node) 完 成 , 還 沒 執(zhí) 行 tail=head 語 句 時 , 此 時 t=null 、 head=new Node() 、head.next=null。
- 如果h!=t成立,head.next != null,判斷head.next是不是當(dāng)前線程,如果是就返回false,否則返回true。
head節(jié)點是獲取到鎖的節(jié)點,但是任意時刻head節(jié)點可能占用著鎖,也可能釋放了鎖,如果釋放了鎖,那么此時state=0,未被阻塞的head.next節(jié)點對應(yīng)的線程在任意時刻都是在自旋地嘗試獲取鎖。
AQS 條件隊列
Condition是JUC用來替代傳統(tǒng)的Object的wait()/notify()線程間通信與協(xié)作機制的新組件,相比使用Object的wait()/notify(),使用Condition的await()/signal()這種方式實現(xiàn)線程間協(xié)作更加高效。
Condition 基本原理
Condition與Object的wait()/notify()作用是相似的,都是使得一個線程等待某個條件(Condition),只有當(dāng)該條件具備signal()或者signalAll()方法被調(diào)用時等待線程才會被喚醒,從而重新爭奪鎖。不同的是,Object的wait()/notify()由JVM底層實現(xiàn),而Condition接口與實現(xiàn)類完全使用Java代碼實現(xiàn)。當(dāng)需要進行線程間的通信時,建議結(jié)合使用ReentrantLock與Condition,通過Condition的await()和
signal()方法進行線程間的阻塞與喚醒。
ConditionObject類是實現(xiàn)條件隊列的關(guān)鍵,每個ConditionObject對象都維護一個單獨的條件等待對列。每個ConditionObject對應(yīng)一個條件隊列,它記錄該隊列的隊首節(jié)點和尾節(jié)點。
public class ConditionObject implements Condition, java.io.Serializable {
//記錄該隊列的隊首節(jié)點
private transient Node firstWaiter;
//記錄該隊列的尾節(jié)點
private transient Node lastWaiter;
}
一個Condition對象是一個單條件的等待隊列,具體如圖所示:
在一個顯式鎖上,我們可以創(chuàng)建多個等待任務(wù)隊列,這點和內(nèi)置鎖不同,Java內(nèi)置鎖上只有唯一的一個等待隊列。比如,我們可以使用newCondition()創(chuàng)建兩個等待隊列,具體如下:
private Lock lock = new ReentrantLock();
//創(chuàng)建第一個等待隊列
private Condition firstCond = lock.newCondition();
//創(chuàng)建第二個等待隊列
private Condition secondCond = lock.newCondition();
Condition條件隊列與AQS同步隊列的關(guān)系如圖所示:
Condition條件隊列是單向的,而AQS同步隊列是雙向的,AQS節(jié)點會有前驅(qū)指針。一個AQS實例可以有多個條件隊列,是聚合關(guān)系;但是一個AQS實例只有一個同步隊列,是邏輯上的組合關(guān)系。
await()等待方法原理
當(dāng)線程調(diào)用await()方法時,說明當(dāng)前線程的節(jié)點為當(dāng)前AQS隊列的隊首節(jié)點,正好處于占有鎖的狀態(tài),await()方法需要把該線程從AQS隊列挪到Condition等待隊列里,如圖所示:
在await()方法中將當(dāng)前線程挪動到Condition等待隊列后,還會喚醒AQS同步隊列中head節(jié)點的下一個節(jié)點。await()方法的整體流程如下:
- 執(zhí)行await()時,會新創(chuàng)建一個節(jié)點并放入到Condition隊列尾部。
- 然后釋放鎖,并喚醒AQS同步隊列中的隊首節(jié)點的后一個節(jié)點。
- 然后執(zhí)行while循環(huán),將該節(jié)點的線程阻塞,直到該節(jié)點離開等待隊列,重新回到同步隊列成為同步節(jié)點后,線程才退出while循環(huán)。
- 退出循環(huán)后,開始調(diào)用acquireQueued()不斷嘗試拿鎖。
- 拿到鎖后,會清空Condition隊列中被取消的節(jié)點。
創(chuàng)建一個新節(jié)點并放入Condition隊列尾部的工作由addConditionWaiter()方法完成,該方法具體如下:
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾節(jié)點取消,重新定位尾節(jié)點
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//創(chuàng)建一個新Node,作為等待節(jié)點
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//將新Node加入等待隊列
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
signal()喚醒方法原理
線程在某個ConditionObject對象上調(diào)用signal()方法后,等待隊列中的firstWaiter會被加入到同步隊列中,等待節(jié)點被喚醒,流程如圖所示:
signal()方法的整體流程如下:
- 通過enq()方法自旋(該方法已經(jīng)介紹過),將條件隊列中的隊首節(jié)點放入到AQS同步隊列尾部,并獲取它在AQS隊列中的前驅(qū)節(jié)點。
- 如果前驅(qū)節(jié)點的狀態(tài)是取消狀態(tài),或者設(shè)置前驅(qū)節(jié)點為Signal狀態(tài)失敗,就喚醒當(dāng)前節(jié)點的線程;否則節(jié)點在同步隊列的尾部,參與排隊。
- 同步隊列中的線程被喚醒后,表示重新獲取了顯式鎖,然后繼續(xù)執(zhí)行condition.await()語句后面的臨界區(qū)代碼。
節(jié)點入隊的時機
在介紹完AQS之后總結(jié)一下,節(jié)點入隊AQS的時機,這里暫時梳理了兩個時機和三種細(xì)分場景。
-
時機一:在模板方法acquire()中,如果調(diào)用tryAcquire(arg)嘗試成功,acquire()將直接返回,表示已經(jīng)搶到鎖;如果不成功,則開始將線程加入等待隊列。這里分為三種場景:
-
模板方法acquire(arg)通過addWaiter(Node node, int args)方法,嘗試將該節(jié)點加入到同步隊
列的隊尾,在存在競爭的場景時一般會成功。當(dāng)然,如果加入失敗,或者同步隊列為空,就開始調(diào)
用enq(final Node node)自旋入隊。 -
enq()方法通過CAS自旋將新節(jié)點插入隊列尾部。具體來說,如果AQS的隊列非空,新節(jié)點
入隊的插入位置在隊列的尾部,并且是通過CAS方式插入的,插入之后AQS的tail將指向新節(jié)點,
新節(jié)點作為尾節(jié)點。 -
enq()方法初始化AQS隊列再執(zhí)行CAS自旋。如果AQS的隊列為空,新節(jié)點入隊時首先進行
隊列初始化,AQS通過CAS方法創(chuàng)建隊首節(jié)點,并且將tail指針指向隊首節(jié)點。然后自旋,進入CAS
自旋插入操作,直到插入成功,自旋才結(jié)束。
-
-
時機二:Condition等待隊列上的節(jié)點被signal()喚醒,會通過enq(final Node node)自旋入隊,插入AQS的尾部。
AQS 的實際應(yīng)用
首先介紹一下JUC的總體架構(gòu),如圖所示:
AQS建立在CAS原子操作和volatile可見性變量的基礎(chǔ)之上,為上層的顯式鎖、同步工具類、阻塞隊列、線程池、并發(fā)容器、Future異步工具提供線程之間同步的基礎(chǔ)設(shè)施。所以,AQS在JUC框架中的使用是非常廣泛的。文章來源:http://www.zghlxwxcb.cn/news/detail-676435.html
感謝耐心看到這里的同學(xué),覺得文章對您有幫助的話希望同學(xué)們不要吝嗇您手中的贊,動動您智慧的小手,您的認(rèn)可就是我創(chuàng)作的動力!
之后還會勤更自己的學(xué)習(xí)筆記,感興趣的朋友點點關(guān)注哦。文章來源地址http://www.zghlxwxcb.cn/news/detail-676435.html
到了這里,關(guān)于萬字長文解析AQS抽象同步器核心原理(深入閱讀AQS源碼)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!