AQS自身屬性:
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
Node屬性:
// 共享
static final Node SHARED = new Node();
// 獨(dú)占
static final Node EXCLUSIVE = null;
// 線程被取消了
static final int CANCELLED = 1;
// 后繼線程需要喚醒
static final int SIGNAL = -1;
// 等待condition喚醒
static final int CONDITION = -2;
// 共享式同步狀態(tài)獲取將會(huì)無(wú)條件地傳播下去
static final int PROPAGATE = -3;
// 初始為0,狀態(tài)是上面的幾種
volatile int waitStatus; // 很重要
// 前置節(jié)點(diǎn)
volatile Node prev;
// 后置節(jié)點(diǎn)
volatile Node next;
volatile Thread thread;
-
1. 以ReentrantLock為例,其他類舉一反三,方法lock()
-
2. Lock接口實(shí)現(xiàn)類,基本都是通過(guò)【聚合】了一個(gè) 【隊(duì)列同步器】的子類完成線程訪問控制的Sync實(shí)現(xiàn)lock;Sync繼承AQS;Sync又有兩個(gè)實(shí)現(xiàn)類。
-
// 公平與非公平鎖的構(gòu)造 public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
?公平鎖與非公平鎖的實(shí)現(xiàn):
-
1.0 // 非公平鎖 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final void lock() { if (compareAndSetState(0, 1)) // 這里一開始省略了acquire,能搶到直接搶,搶不到再排隊(duì) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires);// 實(shí)現(xiàn)展開在下面 } } 1.1 // 非公平鎖的 tryAcquire實(shí)現(xiàn)細(xì)節(jié) final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 這里判斷狀態(tài),1是被占用,0是不被占用。再嘗試搶一下,如果正好線程釋放鎖那么就正好搶到 if (compareAndSetState(0, acquires)) { // 這里和公平鎖相比缺少了一個(gè)hasQueuedPredecessors() setExclusiveOwnerThread(current); // 設(shè)置獲取排他鎖的線程 return true; // } } else if (current == getExclusiveOwnerThread()) { // 這里判斷是不是自己持有鎖,自己持有就進(jìn)去,畢竟可重入鎖 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; // 即不空閑也不是自己占用鎖,則返回false } 2.0 // 公平鎖: static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); // acquire方法 -> tryAcquire(arg) -> protected boolean tryAcquire(int arg) // tryAcquire是父類定義的,這里使用了模版設(shè)計(jì)模式 // 見下面的函數(shù):2.1 } protected final boolean tryAcquire(int acquires) { // tryAcquire的實(shí)現(xiàn) final Thread current = Thread.currentThread(); int c = getState(); // 獲取狀態(tài)位 if (c == 0) { if (!hasQueuedPredecessors() && // 公平鎖分支,公平與非公平就這里不一樣,詳細(xì)在下面2.2 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } 2.1 // 父類不實(shí)現(xiàn),下放到子類實(shí)現(xiàn) // 子類有很多實(shí)現(xiàn)類,這里參考FairSync的實(shí)現(xiàn),代碼在上面 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } 2.2 // hasQueuedPredecessors // 邏輯很簡(jiǎn)單,就是判斷一下隊(duì)列前面是否有排隊(duì)線程,如果返回true,說(shuō)明有一個(gè)排隊(duì)的線程;返回false說(shuō)明這個(gè)線程是隊(duì)列的頭 public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
接下來(lái)以非公平鎖為突破口:
-
1. 對(duì)比公平鎖和非公平鎖的 tryAcquire()方法的實(shí)現(xiàn)代碼,其實(shí)差別就在于非公平鎖獲取鎖時(shí)比公平鎖中少了一個(gè)判斷hasQuevedPredecessors()
-
2. hasQueuedPredecessors() 中判斷了是否需要排隊(duì),導(dǎo)致公平鎖和非公平鎖的差異如下:
-
1. 公平鎖:公平鎖講究先來(lái)先到,線程在獲取鎖時(shí)如果這個(gè)鎖的等待隊(duì)列中己經(jīng)有線程在等待,那么當(dāng)前線程會(huì)進(jìn)入等待隊(duì)列中
-
2. 非公平鎖:不管是否有等待隊(duì)列,如果可以獲取鎖,則立刻占有鎖對(duì)象。也就是說(shuō)隊(duì)列的第一個(gè)排隊(duì)線程蘇醒后,不一定就是排頭的這個(gè)線程得鎖,它還是需要參加競(jìng)爭(zhēng)鎖(存在線程競(jìng)爭(zhēng)的情況下),后來(lái)的線程可能不講武德插隊(duì)奪鎖了。
-
?
?
引出源碼三部曲:
-
1. tryAcquire
-
2. addWaiter
-
3. acquireQueued
lock方法:
// 對(duì)比公平與非公平的lock方法:
// 公平鎖lock
final void lock() {
acquire(1);
}
// 非公平鎖
final void lock() {
if (compareAndSetState(0, 1)) // CAS操作直接搶鎖
setExclusiveOwnerThread(Thread.currentThread()); // 設(shè)置擁有獨(dú)占量的線程
else
acquire(1); // 搶不到再去排隊(duì)
}
// 這里非公平的復(fù)雜,我們看非公平鎖的代碼:
acquire三大流向:
?
1.0 acquire方法
// 公平與非公平的acquire方法一樣的
public final void acquire(int arg) {
if (!tryAcquire(arg) && // tryAcquire方法是再次嘗試搶一下鎖,看看是否空閑或者自己持有鎖,具體實(shí)現(xiàn)在前面的代碼里。這里如果失敗就是false,取反就是true,繼續(xù)往下走,執(zhí)行addWaiter
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // Node.EXCLUSIVE表示獨(dú)占排他
selfInterrupt();
}
2.0 tryAcquire方法
// acquire方法第一個(gè)執(zhí)行的方法,也是分支的第一支
// 頂層父類并未實(shí)現(xiàn)改方法,AQS類中
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 公平鎖和非公平鎖都對(duì)此方法進(jìn)行了實(shí)現(xiàn),非公平只是缺少了hasQueuedPredecessors()方法,代碼在前面有展示
3.0 addWaiter方法
// 如果執(zhí)行2.0搶鎖失敗
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 封裝線程,進(jìn)入隊(duì)列
Node pred = tail; // 設(shè)置前置為tail
if (pred != null) { // 如果pred不為空,意思就是不是隊(duì)列的第一個(gè)元素,執(zhí)行下列操作
node.prev = pred; // 更新前置節(jié)點(diǎn)
if (compareAndSetTail(pred, node)) { // 3.0.1CAS的修改尾部元素,并且直接返回node,不再執(zhí)行enq入隊(duì)操作
pred.next = node;
return node;
}
}
enq(node); // 進(jìn)入隊(duì)列,當(dāng)且僅當(dāng)node是pred==null時(shí)才會(huì)執(zhí)行
return node;
}
3.1 enq方法:
// 入隊(duì)操作:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 如果tail為空,則需要初始化,這里頭節(jié)點(diǎn)并沒有使用實(shí)參node,而是new了一個(gè)空節(jié)點(diǎn),
if (compareAndSetHead(new Node())) // CAS的修改隊(duì)首,期望值為null,改為空節(jié)點(diǎn),也叫虛擬節(jié)點(diǎn),作用就是站位
tail = head; // 將頭尾指針都指向這個(gè)節(jié)點(diǎn)
} else { // 如果tail不為空(可能一開始tail為空,但是是一個(gè)for true循環(huán),第一次初始化之后就會(huì)走這一步)
node.prev = t; // 將t設(shè)置為node節(jié)點(diǎn)的前置節(jié)點(diǎn),
if (compareAndSetTail(t, node)) { //CAS的修改尾節(jié)點(diǎn),這部分代碼邏輯和3.0.1 邏輯一樣
t.next = node;
return t;
}
}
}// 注意這是個(gè)循環(huán),保證這個(gè)里面一定會(huì)執(zhí)行到else,
}
4.0 acquireQueued方法
// 解釋了線程節(jié)點(diǎn)如何在隊(duì)列里自旋
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 中斷標(biāo)記
for (;;) {
final Node p = node.predecessor(); // 獲取當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)
if (p == head && tryAcquire(arg)) { // 如果前置節(jié)點(diǎn)是head,再次嘗試獲取搶鎖
setHead(node); // 細(xì)節(jié)見4.1
p.next = null; // 意思是將獲取到鎖的節(jié)點(diǎn)變成虛擬頭節(jié)點(diǎn),之前的虛擬頭節(jié)點(diǎn)廢棄掉,直接GC
failed = false; // 入隊(duì)成功,則改為false
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 細(xì)節(jié)見4.2 既改變狀態(tài),又返回boolean值
parkAndCheckInterrupt()) // 細(xì)節(jié)見4.3 等待lockSupport發(fā)放許可
interrupted = true; // 修改interrupted為true
}
} finally {
if (failed) // 如果入隊(duì)失敗,則取消排隊(duì)
cancelAcquire(node);
}
}
4.1 setHead
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
4.2 shouldParkAfterFailedAcquire方法
//
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 此處pred就是head節(jié)點(diǎn)
int ws = pred.waitStatus; // head節(jié)點(diǎn)狀態(tài)默認(rèn)為wait 0
if (ws == Node.SIGNAL) // 如果是-1則執(zhí)行
return true;
if (ws > 0) { // 大于0
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // status = 0跑到這里來(lái)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 后面的節(jié)點(diǎn)將他前面的節(jié)點(diǎn)status改為-1
}
return false;
}
4.3 parkAndCheckInterrupt
// 發(fā)放許可并不是卡住線程的關(guān)鍵,他只是保證,進(jìn)入隊(duì)列的線程節(jié)點(diǎn)一定是按照順序一個(gè)個(gè)執(zhí)行的
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 從acquireQueued方法過(guò)來(lái),如果沒有獲得許可就一直在這卡著
return Thread.interrupted(); // 正常執(zhí)行,沒有意外或被打斷就是false,跳轉(zhuǎn)到4.0中繼續(xù)
}
?unlock方法:
0.0 unlock
//
public void unlock() {
sync.release(1);
}
1.0 release
// 釋放鎖
public final boolean release(int arg) {
if (tryRelease(arg)) { // 細(xì)節(jié)見1.1 此時(shí)執(zhí)行結(jié)果為true
Node h = head; // 獲取頭節(jié)點(diǎn)
if (h != null && h.waitStatus != 0) // 頭節(jié)點(diǎn)不等于null,且頭節(jié)點(diǎn)wait!=0,(之前修改為SIGNAL -1)
unparkSuccessor(h); // 細(xì)節(jié)見1.2,傳入?yún)?shù)為頭節(jié)點(diǎn)
return true;
}
return false;
}
1.1
// tryRelease
protected final boolean tryRelease(int releases) { // releases = 1
int c = getState() - releases; // c =0
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 進(jìn)入這里
free = true; // free = true
setExclusiveOwnerThread(null);
}
setState(c);
return free; // 返回true
}
1.2 unparkSuccessor
//
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 頭節(jié)點(diǎn)進(jìn)來(lái)之后,會(huì)將wait的-1改為0
Node s = node.next; // 獲取頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),即第一個(gè)真實(shí)任務(wù)節(jié)點(diǎn)
if (s == null || s.waitStatus > 0) { // 節(jié)點(diǎn)不為null,不進(jìn)來(lái)
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 節(jié)點(diǎn)不為null,進(jìn)入這里
LockSupport.unpark(s.thread); // 給線程節(jié)點(diǎn)發(fā)放許可,繼續(xù)進(jìn)入到1.0中;此處受影響的還有acquire方法代碼塊的4.3
// 不過(guò)發(fā)放許可并不代表中一定是這個(gè)線程獲取鎖,這就是非公平鎖的精髓,總會(huì)有不講武德插隊(duì)的,獲取許可只是有獲取鎖的可能,是否真的獲取到還要看能否搶到鎖。這里發(fā)放許可,使得被park的線程喚醒了,可以去tryAcquire操作,依然可能會(huì)失敗,繼續(xù)被阻塞
// 這里如果不使用park和unpark,則會(huì)一直執(zhí)行for,導(dǎo)致cpu負(fù)載過(guò)重
}
異常情況
模擬某個(gè)線程不想等待了,想取消排隊(duì)
-
1. 隊(duì)尾元素離開
-
2. 隊(duì)中間的元素離開
-
3. 隊(duì)中間多個(gè)元素離開文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-499790.html
1.0 cancelAcquire方法 // 取消入隊(duì) private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // 將線程賦值null Node pred = node.prev; // 獲取前一個(gè)節(jié)點(diǎn) while (pred.waitStatus > 0) // waitStatus>0意思就是取消狀態(tài),一直向前獲取到一個(gè)不會(huì)取消的頭節(jié)點(diǎn), node.prev = pred = pred.prev; // 最壞情況就是獲取到虛擬頭節(jié)點(diǎn) Node predNext = pred.next; // 前置節(jié)點(diǎn)的next指向當(dāng)前節(jié)點(diǎn)的next node.waitStatus = Node.CANCELLED; // 狀態(tài)設(shè)置為取消 if (node == tail && compareAndSetTail(node, pred)) { // 如果node是隊(duì)尾節(jié)點(diǎn),CAS將pre節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn) compareAndSetNext(pred, predNext, null); // 同時(shí)CAS設(shè)置pred節(jié)點(diǎn)的next為null } else { // 如果不是隊(duì)尾節(jié)點(diǎn) int ws; if (pred != head && // 如果前置節(jié)點(diǎn)不是頭節(jié)點(diǎn)且 ((ws = pred.waitStatus) == Node.SIGNAL || //前置節(jié)點(diǎn)的狀態(tài)為-1或狀態(tài)小于0并且修改為-1成功 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // 且pred線程不為空 Node next = node.next; // 獲取當(dāng)前節(jié)點(diǎn)的next if (next != null && next.waitStatus <= 0) // next不為空且status<=0時(shí) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-499790.html
到了這里,關(guān)于AQS源碼分析——以ReentrantLock為例的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!