1 概要
AQS在類的注釋上說的已經(jīng)很明白,提供一個框架,用于實(shí)現(xiàn)依賴先進(jìn)先出(FIFO)等待隊列的阻塞鎖和相關(guān)同步器(信號量、事件等)。此類被設(shè)計做為大多數(shù)類型的同步器的一個有用的基礎(chǔ)類,這些同步器依賴于單個原子int值(state字段)來表示狀態(tài)。
java 并發(fā)編程系列文章目錄
2 技術(shù)名詞解釋
CAS:cas是比較并交換(compare and swap)的縮寫,java對其具體實(shí)現(xiàn)是Usafe類,它有一系列的compareAndSwap方法
3 AQS核心方法原理
3.1 acquire(int arg)
從代碼邏輯上可以看到,首先嘗試獲取,如果此時返回true, 執(zhí)行結(jié)束。相當(dāng)于獲取到鎖了。tryAcquire方法是抽象方法,比如公平鎖和非公平鎖的不同實(shí)現(xiàn)邏輯
如果嘗試獲取失敗,會進(jìn)入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//沒獲取到鎖但是park住了,如果unpack且如果在等待的過程中發(fā)生中斷走到這,進(jìn)行中斷位標(biāo)記
selfInterrupt();
}
此時具體看下acquireQueued(addWaiter(Node.EXCLUSIVE), arg)邏輯。
- addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
//創(chuàng)建Node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
//先嘗試一次原子加入鏈表尾部中
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//走下述邏輯,上面嘗試未成功,或者此時tail == null(因?yàn)槌跏蓟臅r候都是null 在后續(xù)才會設(shè)置值)
enq(node);
return node;
}
private Node enq(final Node node) {
//死循環(huán)+原子性加到列表中
for (;;) {
Node t = tail;
if (t == null) { // 初始化頭結(jié)點(diǎn)和尾結(jié)點(diǎn)
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- acquireQueued(final Node node, int arg)
因?yàn)閍ddWaiter(Node mode)最終是死循環(huán)+原子性加到列表中,是肯定成功的。也就意味著需要阻塞當(dāng)前線程了。但是有沒有挽救的余地呢?
todo1
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取當(dāng)前線程節(jié)點(diǎn)的前一個節(jié)點(diǎn),因?yàn)樵谏喜揭呀?jīng)添加到tail中了
final Node p = node.predecessor();
//如果此時前一個節(jié)點(diǎn)時頭節(jié)點(diǎn),因?yàn)樵诟乓幸呀?jīng)說明,先進(jìn)先出隊列,所以此時前面就一個頭
//結(jié)點(diǎn),此時你就是阻塞隊列第一個,那沒線程阻塞啊,所以此時再嘗試下獲取鎖
if (p == head && tryAcquire(arg)) {
//此時獲取到鎖,那么線程安全的情況下設(shè)置頭結(jié)點(diǎn)的node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//嘗試還是沒獲取到鎖,那么原子設(shè)置pred.waitStatus = -1。waitStatus 狀態(tài)下面會描述
if (shouldParkAfterFailedAcquire(p, node) &&
//設(shè)置pred.waitStatus = -1成功了 會park阻塞線程,并返回該過程中是否被中斷過
parkAndCheckInterrupt())
//這地方是unpark 或者中斷之后的邏輯,又會回到上面去獲取鎖,如果pre不是頭節(jié)點(diǎn),又會被park住。todo1 做個標(biāo)記
interrupted = true;
}
} finally {
if (failed)//正常情況下不會到這
cancelAcquire(node);
}
}
總結(jié)acquire(int arg):在多線程的情況下,有一個線程tryAcquire獲取到鎖,其余的cas進(jìn)入等待隊列,當(dāng)然頭節(jié)點(diǎn)的下一個或嘗試下,沒有獲取成功的都會被park當(dāng)前線程。進(jìn)入阻塞狀態(tài)等待被喚醒
3.2 release(int arg)
先看整體邏輯,和獲取鎖是一致且相反的邏輯。先嘗試釋放鎖,釋放失敗直接false。還可以釋放失???? 如果鎖是可重入的,那么需要把state縮減成0才算釋放。
tryRelease(arg)是個接口方法,子類實(shí)現(xiàn)。
public final boolean release(int arg) {
if (tryRelease(arg)) {
//如果成功了,
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
//waitStatus下面描述
int ws = node.waitStatus;
if (ws < 0)
//把當(dāng)前節(jié)點(diǎn)的waitStatus修改成0,如果失敗了呢,好像也不影響,
compareAndSetWaitStatus(node, ws, 0);
//s.waitStatus > 0就是線程取消狀態(tài),也即無效的節(jié)點(diǎn),再次剔除掉找到有效的節(jié)點(diǎn),從尾往頭找
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//正常情況下從頭取等待的線程喚醒,但是如果頭節(jié)點(diǎn)的next節(jié)點(diǎn)是被取消了,那就從尾部找了,好像也不是嚴(yán)格的先進(jìn)先出啊....
if (s != null)
LockSupport.unpark(s.thread);
}
3.3 acquireInterruptibly(int arg)
相對于3.1的acquire(int arg),該方法會對線程獲取鎖的過程中線程發(fā)生中斷拋出異常
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//先嘗試獲取鎖,未獲取到進(jìn)度會拋異常的獲取鎖方法
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
//對比下
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//區(qū)別就在這,當(dāng)parkAndCheckInterrupt()返回true時,也就是獲取鎖結(jié)果被阻塞住了,期間發(fā)生線程中斷會拋出打斷異常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.3 acquireShared(int arg)
獲取共享鎖。同比上述排它鎖,相同邏輯先嘗試獲取共享鎖tryAcquireShared(arg),然后進(jìn)入doAcquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
//和上述一樣添加阻塞隊列,只不過是共享標(biāo)識
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//和上述一樣排它鎖一樣
int r = tryAcquireShared(arg);
if (r >= 0) {
//區(qū)別在這 獨(dú)占鎖setHead(node);只是設(shè)置了頭節(jié)點(diǎn),但是共享鎖在獲取到鎖之后,不僅要設(shè)置頭節(jié)點(diǎn),
//此時還需要判斷后續(xù)節(jié)點(diǎn)。因?yàn)槭枪蚕淼?,你拿到鎖之后,后續(xù)鏈表中挨著的共享節(jié)點(diǎn)也可以被unpark
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
//和獨(dú)占鎖一樣,不帶Interruptibly的方法只是幫你設(shè)置下中斷位
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//和獨(dú)占鎖一樣,不帶Interruptibly的方法只是幫你設(shè)置下中斷位
//此時這喚醒之后 是不是又到循環(huán)上面去了,這個節(jié)點(diǎn)就是頭結(jié)點(diǎn)的next節(jié)點(diǎn)
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
//此時node就是頭節(jié)點(diǎn) 設(shè)置進(jìn)去
setHead(node);
//這是共享鎖在獲取到鎖之后的多余的操作,就是順著鏈表傳播下去。為什么呢?舉個場景,共享讀鎖 如果此時下一個節(jié)點(diǎn)還是共享鎖,其實(shí)是需要喚醒的,此情況下是鏈表中的順序 獨(dú)占,共享,共享,共享這種情況
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//
if (s == null || s.isShared())
//如果下一個節(jié)點(diǎn)是共享的就釋放鎖
doReleaseShared();
}
}
}
3.4 doReleaseShared()
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//可能有其他線程調(diào)用doReleaseShared(),unpark操作只需要其中一個調(diào)用就行了
//正常情況下就是修改成0,unpark改線程,unparkSuccessor上面已描述
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//如果上述失敗,修改成-3 為了同時release的情況,此時設(shè)想下,如果不改成-3,只變成0 release就下不來了,就會一直hang住
//為啥是-3呢 我猜是符合<0的條件吧
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
3.5 releaseShared(int arg)
上面已描述
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
3.6 acquireSharedInterruptibly
按照上面的思路很明白的知道會主動拋出中斷異常的獲取鎖的方法
3.7 hasQueuedPredecessors()
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
//就是判斷當(dāng)前阻塞隊列里是否有阻塞的線程node
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
總結(jié)acquireShared:可以資源共享,共享鎖,相對于上述獨(dú)占鎖,在創(chuàng)建Node的時候會標(biāo)記成SHARED,釋放資源喚醒頭結(jié)點(diǎn)的next節(jié)點(diǎn)的時候會喚醒緊挨著的SHARED Node節(jié)點(diǎn)嗎,當(dāng)然是一個喚醒另一個,因?yàn)樵趇f (p == head)的那里循環(huán)產(chǎn)生的。文章來源:http://www.zghlxwxcb.cn/news/detail-687989.html
4 總結(jié)
如類名一樣,通過Queue的Node鏈表來保存阻塞的線程信息,通過state字段的原子操作代表獲取到的資源情況,這個字段是給子類實(shí)現(xiàn)用的。
關(guān)于AQS的ConditionObject詳解請看詳解二
java并發(fā)編程 AbstractQueuedSynchronizer(AQS)詳解二文章來源地址http://www.zghlxwxcb.cn/news/detail-687989.html
到了這里,關(guān)于java并發(fā)編程 AbstractQueuedSynchronizer(AQS)詳解一的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!