1.等待多線程完成的 CountDownLatch
CountDownLatch 允許一個(gè)或多個(gè)線程等待其他線程完成操作。
假如有這樣一個(gè)需求:我們需要解析一個(gè) Excel 里多個(gè) sheet 的數(shù)據(jù),此時(shí)可以考慮使用多線程,每個(gè)線程解析一個(gè) sheet 里的數(shù)據(jù),等到所有的 sheet 都解析完之后,程序需要提示解析完成。在這個(gè)需求中,要實(shí)現(xiàn)主線程等待所有線程完成 sheet 的解析操作,最簡(jiǎn)單的做法是使用 join()方法,如代碼清單 1-1 所示。
public class CountDownLatchUseCase {
public static void main(String[] args) throws InterruptedException {
Thread threadA = new Thread(()->{
System.out.println("parser1 finish");
},"threadA");
Thread threadB = new Thread(()->{
System.out.println("parser2 finish");
},"threadB");
// 任務(wù)A B 開(kāi)始執(zhí)行
threadA.start();
threadB.start();
// 等待AB 完成任務(wù)
threadA.join();
threadB.join();
}
}
join 用于讓當(dāng)前執(zhí)行線程等待 join 線程執(zhí)行結(jié)束。其實(shí)現(xiàn)原理是不停檢查 join 線程是否存活,如果 join 線程存活則讓當(dāng)前線程永遠(yuǎn)等待。其中,wait(0)表示永遠(yuǎn)等待下去,代碼片段如下。
// 同步方法
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
// isAlive()方法返回一個(gè)boolean值,如果線程已經(jīng)啟動(dòng)且尚未終止,則返回true;否則,返回false。此次的while循環(huán)為了防止被阻塞的線程被意外喚醒,走出這段循環(huán)代表線程任務(wù)已經(jīng)執(zhí)行完畢,線程已經(jīng)終止。
while (isAlive()) {
// 線程進(jìn)入等待狀態(tài)
wait(0);
}
} else {
// 使用join(long millis)參數(shù)不為0走這部分邏輯,如果線程沒(méi)有即時(shí)被喚醒,超時(shí)時(shí)間過(guò)后自己進(jìn)入RUNNABLE狀態(tài)。
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
直到 join 線程中止后,線程的 this.notifyAll()方法會(huì)被調(diào)用,調(diào)用 notifyAll()方法是在 JVM 里實(shí)現(xiàn)的,所以在 JDK 里看不到。
在 JDK 1.5 之后的并發(fā)包中提供的 CountDownLatch 也可以實(shí)現(xiàn) join 的功能,并且比join 的功能更多,將代碼1-1使用CountDownLatch改下下,如代碼清單 1-2 所示。
public class CountDownLatchUseCase {
private static CountDownLatch countDownLatch = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
Thread threadA = new Thread(()->{
System.out.println("parser1 finish");
countDownLatch.countDown();
},"threadA");
Thread threadB = new Thread(()->{
System.out.println("parser2 finish");
countDownLatch.countDown();
},"threadB");
threadA.start();
threadB.start();
// 等待任務(wù)完成
countDownLatch.await();
}
}
CountDownLatch 的構(gòu)造函數(shù)接收一個(gè) int 類(lèi)型的參數(shù)作為計(jì)數(shù)器,如果你想等待 N個(gè)點(diǎn)完成,這里就傳入 N。當(dāng)我們調(diào)用 CountDownLatch 的 countDown 方法時(shí),N 就會(huì)減 1,CountDownLatch 的 await 方法會(huì)阻塞當(dāng)前線程,直到 N 變成零。由于 countDown方法可以用在任何地方,所以這里說(shuō)的 N 個(gè)點(diǎn),可以是 N 個(gè)線程,也可以是 1 個(gè)線程里的 N 個(gè)執(zhí)行步驟。用在多個(gè)線程時(shí),只需要把這個(gè) CountDownLatch 的引用傳遞到線程里即可。
如果有某個(gè)解析 sheet 的線程處理得比較慢,我們不可能讓主線程一直等待,所以可以使用另外一個(gè)帶指定時(shí)間的 await 方法——await(long time,TimeUnit unit),這個(gè)方法等待特定時(shí)間后,就會(huì)不再阻塞當(dāng)前線程。join 也有類(lèi)似的方法。
計(jì)數(shù)器必須大于等于 0,只是等于 0 時(shí)候,計(jì)數(shù)器就是零,調(diào)用 await 方法時(shí)不會(huì)阻塞當(dāng)前線程。CountDownLatch 不可能重新初始化或者修改 CountDownLatch對(duì)象的內(nèi)部計(jì)數(shù)器的值。一個(gè)線程調(diào)用 countDown 方法 happen-before,另外一個(gè)線程調(diào)用 await 方法。
2.CountDownLatch原理解析
2.1 構(gòu)造方法 public CountDownLatch(int count)
初始化好AQS同步狀態(tài)為state
// 1.方法 CountDownLatch countDownLatch = new CountDownLatch(2);
# java.util.concurrent.CountDownLatch#CountDownLatch
// 使用構(gòu)造方法將AQS的同步狀態(tài)位初始為count
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
2.2 方法public void countDown()
# 片段1 java.util.concurrent.CountDownLatch#countDown
public void countDown() {
// 將共享狀態(tài)State的值減一
sync.releaseShared(1);
}
# 片段 2 java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
// tryReleaseShared(arg)方法是CountDownLatch 重寫(xiě)的方法,該方法見(jiàn)片段3
if (tryReleaseShared(arg)) {
// 當(dāng)同步狀態(tài)為0時(shí)會(huì)執(zhí)行這個(gè)方法
doReleaseShared();
return true;
}
return false;
}
# 片段 3 java.util.concurrent.CountDownLatch.Sync#tryReleaseShared 模板方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 自旋+CAS 更新同步狀態(tài)
for (;;) {
// 獲得同步狀態(tài)
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
// 如果同步更新后的值為0放回true
return nextc == 0;
}
}
# 片段 4 java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
// 獲得同步隊(duì)列頭節(jié)點(diǎn)
Node h = head;
// 如果頭節(jié)點(diǎn)存在且不等于尾節(jié)點(diǎn)(tail),進(jìn)入if里
if (h != null && h != tail) {
// 獲得頭節(jié)點(diǎn)的等待狀態(tài)
int ws = h.waitStatus;
// 判斷等待狀態(tài)是否為Node.SIGNAL,等待狀態(tài)為SIGNAL表示需要喚醒后繼節(jié)點(diǎn)
if (ws == Node.SIGNAL) {
//使用compareAndSetWaitStatus(h, Node.SIGNAL, 0)方法來(lái)嘗試將頭節(jié)點(diǎn)的等待狀態(tài)從Node.SIGNAL設(shè)置為0。如果設(shè)置成功,則調(diào)用unparkSuccessor(h)方法喚醒后繼節(jié)點(diǎn)。如果設(shè)置失敗,則繼續(xù)循環(huán)重新檢查情況。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒后續(xù)節(jié)點(diǎn) 見(jiàn)代碼片段5
unparkSuccessor(h);
}
//如果等待狀態(tài)為0,并且使用compareAndSetWaitStatus(h, 0, Node.PROPAGATE)方法嘗試將頭節(jié)點(diǎn)的等待狀態(tài)從0設(shè)置為Node.PROPAGATE,則繼續(xù)循環(huán)。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//頭節(jié)點(diǎn)不變退出循環(huán)
if (h == head) // loop if head changed
break;
}
}
# 片段5 java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 如果等待狀態(tài)ws小于0,嘗試將等待狀態(tài)清除為0,忽略修改失敗和其他線程可能對(duì)等待狀態(tài)的更改。
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 獲得頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 循環(huán)遍歷從尾節(jié)點(diǎn)開(kāi)始,直到找到非取消狀態(tài)的節(jié)點(diǎn)或者已經(jīng)遍歷到給定節(jié)點(diǎn)node為止。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 對(duì)首個(gè)等待線程進(jìn)行喚醒
LockSupport.unpark(s.thread);
}
2.3 方法countDownLatch.await()
# 片段1 java.util.concurrent.CountDownLatch#await()
public void await() throws InterruptedException {
// 見(jiàn)代碼片段2
sync.acquireSharedInterruptibly(1);
}
# 片段2 java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 判斷線程是否被中斷,中斷就拋出中斷異常
if (Thread.interrupted())
throw new InterruptedException();
// 調(diào)用tryAcquireShared(arg)獲取同步狀態(tài)信息。見(jiàn)代碼片段3
if (tryAcquireShared(arg) < 0)
// 同步狀態(tài)不為0執(zhí)行,doAcquireSharedInterruptibly(arg)見(jiàn)代碼片段4
doAcquireSharedInterruptibly(arg);
}
# 片段3 java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
protected int tryAcquireShared(int acquires) {
// 同步狀態(tài)是否為0
return (getState() == 0) ? 1 : -1;
}
# 片段4 java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 將一個(gè)代表當(dāng)前線程的等待節(jié)點(diǎn)node添加到同步隊(duì)列中。
final Node node = addWaiter(Node.SHARED);
// 設(shè)置一個(gè)標(biāo)志變量failed,用于跟蹤獲取許可的過(guò)程是否失敗。
boolean failed = true;
try {
// 循環(huán)
for (;;) {
// 獲取當(dāng)前節(jié)點(diǎn)多的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
// 判斷前驅(qū)節(jié)點(diǎn)是否為頭節(jié)點(diǎn)
if (p == head) {
//見(jiàn)代碼片段3。
int r = tryAcquireShared(arg);
if (r >= 0) {
// 同步狀態(tài)為0 代表new CountDownLatch(int count) 傳入的count已經(jīng)被countDown()減少完了,代表任務(wù)都執(zhí)行完了。
//設(shè)置頭節(jié)點(diǎn)為當(dāng)前節(jié)點(diǎn)node,并觸發(fā)傳播操作,將共享許可傳播給后繼節(jié)點(diǎn)。
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// shouldParkAfterFailedAcquire(p, node) 方法判斷同步狀態(tài)不為0時(shí)是否需要阻塞線程等待,返回true時(shí)繼續(xù)執(zhí)行parkAndCheckInterrupt()方法阻塞當(dāng)前調(diào)用await()的線程,直到有線程調(diào)用countDown()將同步狀態(tài)state更新為0時(shí),調(diào)用 unparkSuccessor(Node node)方法將當(dāng)前線程喚醒。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.總結(jié)
CountDownLatch使用主要圍繞CountDownLatch.countDown()方法和CountDownLatch.await()方法,使用時(shí)先通過(guò)CountDownLatch的構(gòu)造方法設(shè)置共享同步變量state的大小,然后讓其他“工作”線程完成工作任務(wù)時(shí)調(diào)用countDown方法將state-1,讓“等待”線程調(diào)用await()方法等待“工作”線程完成任務(wù),當(dāng)某個(gè)“工作”線程完成最后一個(gè)任務(wù),并且將state更新為0時(shí),這個(gè)“工作”線程會(huì)去喚醒同步隊(duì)列中的第一個(gè)“等待”線程,首個(gè)“等待”線程被喚醒會(huì)“通知”其他“等待”線程,然后他們從await()方法方法,繼續(xù)執(zhí)行。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-597337.html
參考 《Java并發(fā)編程的藝術(shù)》文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-597337.html
到了這里,關(guān)于Java并發(fā)工具CountDownLatch的使用和原理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!