系列文章目錄
JUC篇:volatile可見性的實(shí)現(xiàn)原理
JUC篇:synchronized的應(yīng)用和實(shí)現(xiàn)原理
JUC篇:用Java實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線程池
JUC篇:java中的線程池
JUC篇:ThreadLocal的應(yīng)用與原理
前言
在JDK的并發(fā)包里提供了幾個(gè)非常有用的并發(fā)工具類。CountDownLatch、CyclicBarrier和
Semaphore工具類提供了一種并發(fā)流程控制的手段。本文會(huì)配合一些應(yīng)用場(chǎng)景來介紹如何使用這些工具類并解析其中的原理。
一、等待多線程完成的CountDownLatch
CountDownLatch允許一個(gè)或多個(gè)線程等待其他線程完成操作。
在日常開發(fā)中經(jīng)常會(huì)遇到需要在主線程中開啟多個(gè)線程去并行執(zhí)行任務(wù),并且主線程需要等待所有子線程執(zhí)行完畢后再進(jìn)行匯總的場(chǎng)景。在CountDownLatch出現(xiàn)之前一般都使用線程的join()方法來實(shí)現(xiàn)這一點(diǎn),但是join方法不夠靈活,不能夠滿足不同場(chǎng)景的需要,所以JDK開發(fā)組提供了CountDownLatch這個(gè)類
1.1案例介紹
假如有這樣一個(gè)需求:我們需要解析一個(gè)Excel里多個(gè)sheet的數(shù)據(jù),此時(shí)可以考慮使用多線程,每個(gè)線程解析一個(gè)sheet里的數(shù)據(jù),等到所有的sheet都解析完之后,程序需要提示解析完成。在這個(gè)需求中,要實(shí)現(xiàn)主線程等待所有線程完成sheet的解析操作,最簡(jiǎn)單的做法是使用join()方法
//用join()實(shí)現(xiàn)主線程等待全部子線程執(zhí)行完之后再執(zhí)行
public class JoinCountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(()->{
System.out.println("線程1....start");
},"線程1");
Thread thread2 = new Thread(() -> {
System.out.println("線程2.。。。start");
}, "線程2");
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("主線程。。。。。start");
}
}
//用CountDownLatch實(shí)現(xiàn)主線程等待全部子線程執(zhí)行完之后再執(zhí)行
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
AtomicInteger res= new AtomicInteger();
Thread thread1 = new Thread(()->{
System.out.println("線程1....start");
for (int i=0;i<10000;i++){
res.incrementAndGet();
}
countDownLatch.countDown();
},"線程1");
Thread thread2 = new Thread(() -> {
System.out.println("線程2.。。。start");
for (int i=0;i<10000;i++){
res.incrementAndGet();
}
countDownLatch.countDown();
}, "線程2");
thread1.start();
thread2.start();
countDownLatch.await();
System.out.println("主線程。。。。。start"+res);
}
}
總結(jié)CountDownLatch與join方法的區(qū)別
- 一個(gè)區(qū)別是,調(diào)用一個(gè)子線程的join()方法后,該線程會(huì)一直被阻塞直到子線程運(yùn)行完畢,而CountDownLatch則使用計(jì)數(shù)器來允許子線程運(yùn)行完畢或者在運(yùn)行中遞減計(jì)數(shù),也就是CountDownLatch可以在子線程運(yùn)行的任何時(shí)候讓await方法返回而不一定必須等到線程結(jié)束。
- 另外,使用線程池來管理線程時(shí)一般都是直接添加Runable到線程池,這時(shí)候就沒有辦法再調(diào)用線程的join方法了,就是說countDownLatch相比join方法讓我們對(duì)線程同步有更靈活的控制。
1.2實(shí)現(xiàn)原理
CountDownLatch的名字就可以猜測(cè)其內(nèi)部應(yīng)該有個(gè)計(jì)數(shù)器,并且這個(gè)計(jì)數(shù)器是遞減的。下面就通過源碼看看JDK開發(fā)組在何時(shí)初始化計(jì)數(shù)器,在何時(shí)遞減計(jì)數(shù)器,當(dāng)計(jì)數(shù)器變?yōu)?日才做了什么操作,多個(gè)線程是如何通過計(jì)時(shí)器值實(shí)現(xiàn)同步的。為了一覽CountDownLatch的內(nèi)部結(jié)構(gòu),我們先看它的類圖
從類圖可以看出,CountDownLatch是使用AQS實(shí)現(xiàn)的。通過下面的構(gòu)造函數(shù),你會(huì)發(fā)現(xiàn),實(shí)際上是把計(jì)數(shù)器的值賦給了AQS的狀態(tài)變量state,也就是這里使用AQS的狀態(tài)值來表示計(jì)數(shù)器值。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
下面我們來研究CountDownLatch中的幾個(gè)重要的方法,看它們是如何調(diào)用AQS來實(shí)現(xiàn)功能的。
1.2.1void await()方法
當(dāng)線程調(diào)用CountDownLatch對(duì)象的await方法后,當(dāng)前線程會(huì)被阻塞,直到下面的情況之一發(fā)生才會(huì)返回
-
當(dāng)所有線程都調(diào)用了CountDownLatch對(duì)象的countDown方法后,也就是計(jì)數(shù)器的值為0時(shí)
-
其他線程調(diào)用了當(dāng)前線程的interrupt()方法中斷了當(dāng)前線程,當(dāng)前線程就會(huì)拋出InterruptedException異常,然后返回
下面看下在await()方法內(nèi)部是如何調(diào)用AQS的方法的。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
從以上代碼可以看到,await()方法委托sync調(diào)用了AQS的acquiresharedlnterruptibIy方法,后者的代碼如下:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//線程被中斷拋出異常
if (Thread.interrupted())
throw new InterruptedException();
//查看計(jì)數(shù)器值是否為0,為0直接返回,不為0進(jìn)入AQS的隊(duì)列等待
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
由如上代碼可知,該方法的特點(diǎn)是線程獲取資源時(shí)可以被中斷,并且獲取的資源是共享資源。acquireSharedInterruptibly首先判斷當(dāng)前線程是否己被中斷,若是則拋出異常,否則調(diào)用sync實(shí)現(xiàn)的tryAcquireShared方法查看當(dāng)前狀態(tài)值(計(jì)數(shù)器值)是否為0,是則
當(dāng)前線程的await()方法直接返回,否則調(diào)用AQS的doAcquireSharedlnterruptibly方法讓當(dāng)前線程阻塞。
另外可以看到,這里tryAcquireShared傳遞的arg參數(shù)沒有被用到,調(diào)用tryAcquireShared的方法僅僅是為了檢查當(dāng)前狀態(tài)值是不是為0,并沒有調(diào)用CAS讓當(dāng)前狀態(tài)值減1
1.2.2.void countDown()方法
線程調(diào)用該方法后,計(jì)數(shù)器的值遞減,遞減后如果計(jì)數(shù)器值為0則喚醒所有因調(diào)用await方法而被阻塞的線程,否則什么都不做。下面看下countDown()方法是如何調(diào)用AQS的方法的。
public void countDown() {
//委托sync調(diào)用AQS的方法
sync.releaseShared(1);
}
由如上代碼可知,CountDownLatch的countDown()方法委托sync調(diào)用了AQS的releaseShared方法,后者的代碼如下。
public final boolean releaseShared(int arg) {
//調(diào)用sync實(shí)現(xiàn)的tryReleaseShared
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
在如上代碼中,releaseShared首先調(diào)用了sync實(shí)現(xiàn)的AQS的tryReleaseShared方法,其代碼如下。
protected boolean tryReleaseShared(int releases) {
//循環(huán)進(jìn)行CAS,直到當(dāng)前線程成功完成CAS使計(jì)數(shù)器值(狀態(tài)值state)減l并更新圭1]state
for (;;) {
int c = getState();
//如果當(dāng)前狀態(tài)值為0,直接返回(1)
if (c == 0)
return false;
int nextc = c-1;
//使用CAS讓計(jì)數(shù)器減1(2)
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
如上代碼首先獲取當(dāng)前狀態(tài)值(計(jì)數(shù)器值)。代碼(1)判斷如果當(dāng)前狀態(tài)值為0則直接返回false,從而countDown()方法直接返回:否則執(zhí)行代碼(2)使用CAS將計(jì)數(shù)器值減1,CAS失敗則循環(huán)重試,否則如果當(dāng)前計(jì)數(shù)器值為0則返回true,返回true說明是最后一個(gè)線程調(diào)用的countdown方法,那么該線程除了讓計(jì)數(shù)器值減1外,還需要喚醒因調(diào)用CountDownLatch的await方法而被阻塞的線程,具體是調(diào)用AQS的doReleaseShared方法來激活阻塞的線程。這里代碼(1)貌似是多余的,其實(shí)不然,之所以添加代碼(1)是為了防止當(dāng)計(jì)數(shù)器值為0后,其他線程又調(diào)用了countDown方法,如果沒有代碼(1)狀態(tài)值就可能會(huì)變成負(fù)數(shù)。
1.3小結(jié)
本節(jié)首先介紹了CountDownLatch的使用,相比使用join方法來實(shí)現(xiàn)線程間同步,前者更具有靈活性和方便性。
另外還介紹了CountDownLatch的原理,CountDownLatch是使用AQS實(shí)現(xiàn)的。使用AQS的狀態(tài)變量來存放計(jì)數(shù)器的值。
首先在初始化CountDownLatch時(shí)設(shè)置狀態(tài)值(計(jì)數(shù)器值),當(dāng)多個(gè)線程調(diào)用countdown方法時(shí)實(shí)際是原子性遞減AQS的狀態(tài)值。當(dāng)線程調(diào)用await方法后當(dāng)前線程會(huì)被放入AQS的阻塞隊(duì)列等待計(jì)數(shù)器為0再返回。其他線程調(diào)用countdown方法讓計(jì)數(shù)器值遞減1,當(dāng)計(jì)數(shù)器值變?yōu)?時(shí),當(dāng)前線程還要調(diào)用AQS的doReleaseShared方法來激活由于調(diào)用await()方法而被阻塞的線程。
二、同步屏障CyclicBarrier
CountDownLatch在解決多個(gè)線程同步方面相對(duì)于調(diào)用線程的join方法己經(jīng)有了不少優(yōu)化,但是CountDownLatch的計(jì)數(shù)器是一次性的,也就是等到計(jì)數(shù)器值變?yōu)?后,再調(diào)用CountDownLatch的await和countdown方法都會(huì)立刻返回,這就起不到線程同步的效果了。
所以為了滿足計(jì)數(shù)器可以重置的需要,JDK開發(fā)組提供了CyclicBarrier類,并且CyclicBarrier類的功能并不限于CountDownLatch的功能。從字面意思理解,CyclicBarrier是回環(huán)屏障的意思,它可以讓一組線程全部達(dá)到一個(gè)狀態(tài)后再全部同時(shí)執(zhí)行。這里之所以叫作回環(huán)是因?yàn)楫?dāng)所有等待線程執(zhí)行完畢,并重置CyclicBarrier的狀態(tài)后它可以被重用。之所以叫作屏障是因?yàn)榫€程調(diào)用await方法后就會(huì)被阻塞,這個(gè)阻塞點(diǎn)就稱為屏障點(diǎn),等所有線程都調(diào)用了await方法后,線程們就會(huì)沖破屏障,繼續(xù)向下運(yùn)行。
2.1案例介紹
假設(shè)一個(gè)任務(wù)由階段1、階段2和階段3組成,每個(gè)線程要串行地執(zhí)行階段1、階段2和階段3,當(dāng)多個(gè)線程執(zhí)行該任務(wù)時(shí)必須要保證所有線程的階段1全部完成后才能進(jìn)入階段2執(zhí)行,當(dāng)所有線程的階段2全部完成后才能進(jìn)入階段3執(zhí)行。下面使用CyclicBarrier來完成這個(gè)需求。
public class CyclicBarrierDemo1Test {
//假設(shè)一個(gè)任務(wù)由階段1、階段2和階段3組成,每個(gè)線程要串行地執(zhí)行階段1、階段2和階段3,
// 當(dāng)多個(gè)線程執(zhí)行該任務(wù)時(shí)必須要保證所有線程的階段1全部完成后才能進(jìn)入階段2執(zhí)行,當(dāng)所有線程的階段2全部完成后才能進(jìn)入階段3執(zhí)行。
// 下面使用CyclicBarrier來完成這個(gè)需求。
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
ExecutorService executorService = Executors.newFixedThreadPool(2);
//線程一
executorService.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+":step-1");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName()+":step-2");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName()+":step-3");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
});
//線程二
executorService.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+":step-1");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName()+":step-2");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName()+":step-3");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
});
//關(guān)閉線程池
executorService.shutdown();
}
}
輸出結(jié)果
在如上代碼中,每個(gè)子線程在執(zhí)行完階段1后都調(diào)用了await方法,等到所有線程都到達(dá)屏障點(diǎn)后才會(huì)一塊往下執(zhí)行,這就保證了所有線程都完成了階段1后才會(huì)開始執(zhí)行階段2。然后在階段2后面調(diào)用了await方法,這保證了所有線程都完成了階段2后,才能開始階段3的執(zhí)行。這個(gè)功能使用單個(gè)CountDownLatch是無法完成的。
2.2實(shí)現(xiàn)原理
CyclicBarrier的類圖結(jié)構(gòu)
由以上類圖可知,CyclicBrrier基于獨(dú)占鎖實(shí)現(xiàn),本質(zhì)底層還是基于AQS的。parties用來記錄線程個(gè)數(shù),這里表示多少線程調(diào)用await后,所有線程才會(huì)沖破屏障繼續(xù)往下運(yùn)行。而count一開始等于parties,每當(dāng)有線程調(diào)用await方法就遞減1,當(dāng)count為0時(shí)就表示所有線程都到了屏障點(diǎn)。
你可能會(huì)疑惑,為何維護(hù)parties和count兩個(gè)變量,只使用count不就可以了?
別忘了CyclicBrrier是可以被復(fù)用的,使用兩個(gè)變量的原因是,parties始終用來記錄總的線程個(gè)數(shù),當(dāng)count計(jì)數(shù)器值變?yōu)?后,會(huì)將parties的值賦給count,從而進(jìn)行復(fù)用。這兩個(gè)變量是在構(gòu)造CyclicBarrier對(duì)象時(shí)傳遞的,如下所示。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
還有一個(gè)變量barrierCommand也通過構(gòu)造函數(shù)傳遞,這是一個(gè)任務(wù),這個(gè)任務(wù)的執(zhí)行時(shí)機(jī)是當(dāng)所有線程都到達(dá)屏障點(diǎn)后。
使用lock首先保證了更新計(jì)數(shù)器count的原子性。
另外使用lock的條件變量trip支持線程間使用await和signal操作進(jìn)行同步。
最后,在變量generation內(nèi)部有一個(gè)變量broken,其用來記錄當(dāng)前屏障是否被打破。
注意,這里的broken并沒有被聲明為volatile的,因?yàn)槭窃阪i內(nèi)使用變量,所以不需要聲明。
private static class Generation {
boolean broken = false;
}
下面來看CyclicBarrier中的幾個(gè)重要的方法。
2.2.1int await()方法
當(dāng)前線程調(diào)用CyclicBarrier的該方法時(shí)會(huì)被阻塞,直到滿足下面條件之一才會(huì)返回:
-
parties個(gè)線程都調(diào)用了await()方法,也就是線程都到了屏障點(diǎn);
-
其他線程調(diào)用了當(dāng)前線程的interrupt()方法中斷了當(dāng)前線程,則當(dāng)前線程會(huì)拋出InterruptedException異常而返回;
-
與當(dāng)前屏障點(diǎn)關(guān)聯(lián)的Generation對(duì)象的broken標(biāo)志被設(shè)置為true時(shí),會(huì)拋出BrokenBarrierException異常,然后返回。
由如下代碼可知,在內(nèi)部調(diào)用了dowait方法。第一個(gè)參數(shù)為false則說明不設(shè)置超時(shí)時(shí)間,這時(shí)候第二個(gè)參數(shù)沒有意義。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
2.2.2int dowait(boolean timed,long nanos)方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
//如果index==O則說明所有線程都到了屏障點(diǎn),此時(shí)執(zhí)行初始化時(shí)傳遞的任務(wù)
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//(2)執(zhí)行任務(wù)
if (command != null)
command.run();
ranAction = true;
//(3)激活其他因調(diào)用await方法而被阻塞的線程,并重置CyclicBarrier
nextGeneration();
//返回
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// (4)如果index!=0
for (;;) {
try {
//(5)沒有設(shè)置超時(shí)時(shí)間
if (!timed)
trip.await();
//(6)設(shè)置了超時(shí)
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// (7)喚醒等待隊(duì)列里面阻塞的線程
trip.signalAll();
// (8)重置CyclicBarrier
count = parties;
generation = new Generation();
}
以上是dowait方法的主干代碼。當(dāng)一個(gè)線程調(diào)用了dowait方法后,首先會(huì)獲取獨(dú)占鎖lock,如果創(chuàng)建CycleBarrier時(shí)傳遞的參數(shù)為10,那么后面9個(gè)調(diào)用錢程會(huì)被阻塞。然后當(dāng)前獲取到鎖的線程會(huì)對(duì)計(jì)數(shù)器count進(jìn)行遞減操作,遞減后count=index=9,因?yàn)?br> index!=0所以當(dāng)前線程會(huì)執(zhí)行代碼(4)。
如果當(dāng)前線程調(diào)用的是無參數(shù)的await()方法,則這里timed=false,所以當(dāng)前線程會(huì)被放入條件變量的p的條件阻塞隊(duì)列,當(dāng)前線程會(huì)被掛起并釋放獲取的lock鎖。如果調(diào)用的是有參數(shù)的await方法則timed=true,然后當(dāng)前線程也會(huì)被放入條件變量的條件隊(duì)列并釋放鎖資源,不同的是當(dāng)前線程會(huì)在指定時(shí)間超時(shí)后自動(dòng)被激活。
當(dāng)?shù)谝粋€(gè)獲取鎖的線程由于被阻塞釋放鎖后,被阻塞的9個(gè)線程中有一個(gè)會(huì)競(jìng)爭(zhēng)到lock鎖,然后執(zhí)行與第一個(gè)線程同樣的操作,直到最后一個(gè)線程獲取到lock鎖,此時(shí)己經(jīng)有9個(gè)線程被放入了條件變量trip的條件隊(duì)列里面。最后count=index等于0,所以執(zhí)行
代碼(2),如果創(chuàng)建CyclicBarrier時(shí)傳遞了任務(wù),則在其他線程被喚醒前先執(zhí)行任務(wù),任務(wù)執(zhí)行完畢后再執(zhí)行代碼(3),喚醒其他9個(gè)線程,并重置CyclicBarrier,然后這10個(gè)線程就可以繼續(xù)向下運(yùn)行了。
2.3小結(jié)
本節(jié)首先通過案例說明了CycleBarrier與CountDownLatch的不同在于,前者是可以復(fù)用的,并且前者特別適合分段任務(wù)有序執(zhí)行的場(chǎng)景。然后分析了CycleBarrier,其通過獨(dú)占鎖ReentrantLock實(shí)現(xiàn)計(jì)數(shù)器原子性更新,并使用條件變量隊(duì)列來實(shí)現(xiàn)線程同步。
三.控制并發(fā)線程數(shù)的Semaphore
Semaphore(信號(hào)量)是用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源。
3.1案例介紹
Semaphore可以用于做流量控制,特別是公用資源有限的應(yīng)用場(chǎng)景,比如數(shù)據(jù)庫(kù)連接。假如有一個(gè)需求,要讀取幾萬個(gè)文件的數(shù)據(jù),因?yàn)槎际荌O密集型任務(wù),我們可以啟動(dòng)幾十個(gè)線程并發(fā)地讀取,但是如果讀到內(nèi)存后,還需要存儲(chǔ)到數(shù)據(jù)庫(kù)中,而數(shù)據(jù)庫(kù)的連接數(shù)只有10個(gè),這時(shí)我們必須控制只有10個(gè)線程同時(shí)獲取數(shù)據(jù)庫(kù)連接保存數(shù)據(jù),否則會(huì)報(bào)錯(cuò)無法獲取數(shù)據(jù)庫(kù)連接。這個(gè)時(shí)候,就可以使用Semaphore來做流量控制
public class SemaphoreTest {
private static final int THREAD_COUNT=30;
private static final ExecutorService THREAD_POOL= Executors.newFixedThreadPool(THREAD_COUNT);
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(10);
for (int i = 0; i <THREAD_COUNT ; i++) {
THREAD_POOL.execute(()->{
try {
semaphore.acquire();
System.out.println("save data");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
THREAD_POOL.shutdown();
}
}
3.2實(shí)現(xiàn)原理
Semaphore的類圖
由該類圖可知,Semaphore還是使用AQS實(shí)現(xiàn)的。Sync只是對(duì)AQS的一個(gè)修飾,并且Sync有兩個(gè)實(shí)現(xiàn)類,用來指定獲取信號(hào)量時(shí)是否采用公平策略。例如,下面的代碼在創(chuàng)建Semaphore時(shí)會(huì)使用一個(gè)變量指定是否使用公平策略。
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Sync(int permits) {
setState(permits);
}
在如上代碼中,Semaphore默認(rèn)采用非公平策略,如果需要使用公平策略則可以使用帶兩個(gè)參數(shù)的構(gòu)造函數(shù)來構(gòu)造Semaphore對(duì)象。另外,如CountDownLatch構(gòu)造函數(shù)傳遞的初始化信號(hào)量個(gè)數(shù)permits被賦給了AQS的state狀態(tài)變量一樣,這里AQS的state值也
表示當(dāng)前持有的信號(hào)量個(gè)數(shù)。
下面來看Semaphore實(shí)現(xiàn)的主要方法。
3.2.1void acquire()方法
當(dāng)前線程調(diào)用該方法的目的是希望獲取一個(gè)信號(hào)量資源。如果當(dāng)前信號(hào)量個(gè)數(shù)大于o,則當(dāng)前信號(hào)量的計(jì)數(shù)會(huì)減1,然后該方法直接返回。否則如果當(dāng)前信號(hào)量個(gè)數(shù)等于0,則當(dāng)前線程會(huì)被放入AQS的阻塞隊(duì)列。當(dāng)其他線程調(diào)用了當(dāng)前線程的interrupt()方法中
斷了當(dāng)前線程時(shí),則當(dāng)前線程會(huì)拋出InterruptedException異常返回。下面看下代碼實(shí)現(xiàn)。
public void acquire() throws InterruptedException {
//傳遞參數(shù)為1,說明要獲取1個(gè)信號(hào)量資源
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//(1)線程被中斷,拋出異常
if (Thread.interrupted())
throw new InterruptedException();
//(2)否則調(diào)用Sync子類方法嘗試獲取,這里根據(jù)構(gòu)造函數(shù)確定使用公平策略
if (tryAcquireShared(arg) < 0)
//如果獲取失敗則放入阻塞隊(duì)列。然后再次嘗試,如果失敗則調(diào)用park方法掛起當(dāng)前線
doAcquireSharedInterruptibly(arg);
}
由如上代碼可知,acquire()在內(nèi)部調(diào)用了Sync的acquireSharedlnterruptibly方法,后者會(huì)對(duì)中斷進(jìn)行響應(yīng)(如果當(dāng)前線程被中斷,則拋出中斷異常)。嘗試獲取信號(hào)量資源的AQS的方法tryAcquireShared是由Sync的子類實(shí)現(xiàn)的,所以這里分別從兩方面來討論。
先討論非公平策略NonfairSync類的t叩A(chǔ)cquireShared方法,代碼如下
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//獲取當(dāng)前信號(hào)量
int available = getState();
//計(jì)算當(dāng)前剩余值
int remaining = available - acquires;
///如果當(dāng)前剩余位小于0或者CAS設(shè)置成功則返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
如上代碼先獲取當(dāng)前信號(hào)量值(available),然后減去需要獲取的值(acquires),得到剩余的信號(hào)量個(gè)數(shù)(remaining),如果剩余值小于0則說明當(dāng)前信號(hào)量個(gè)數(shù)滿足不了需求,那么直接返回負(fù)數(shù),這時(shí)當(dāng)前線程會(huì)被放入AQS的阻塞隊(duì)列而被掛起。如果剩余值大于0,則使用CAS操作設(shè)置當(dāng)前信號(hào)量值為剩余值,然后返回剩余值。
另外,由于NonFairSync是非公平獲取的,也就是說先調(diào)用aquire方法獲取信號(hào)量的線程不一定比后來者先獲取到信號(hào)量??紤]下面場(chǎng)景,如果線程A先調(diào)用了aquire()方法獲取信號(hào)量,但是當(dāng)前信號(hào)量個(gè)數(shù)為0,那么線程A會(huì)被放入AQS的阻塞隊(duì)列。過一
段時(shí)間后線程C調(diào)用了release()方法釋放了一個(gè)信號(hào)量,如果當(dāng)前沒有其他線程獲取信號(hào)量,那么線程A就會(huì)被激活,然后獲取該信號(hào)量,但是假如線程C釋放信號(hào)量后,線程C調(diào)用了aquire方法,那么線程C就會(huì)和線程A去競(jìng)爭(zhēng)這個(gè)信號(hào)量資源。如果采用非公平策略,由nonfairTryAcquireShared的代碼可知,線程C完全可以在線程A被激活前,或者激活后先于線程A獲取到該信號(hào)量,也就是在這種模式下阻塞線程和當(dāng)前請(qǐng)求的線程是競(jìng)爭(zhēng)關(guān)系,而不遵循先來先得的策略。下面看公平性的FairSync類是如何保證公平性的。
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
可見公平性還是靠hasQueuedPredecessors這個(gè)函數(shù)來保證的。前面章節(jié)講過,公平策略是看當(dāng)前線程節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是否也在等待獲取該資源,如果是則自己放棄獲取的權(quán)限,然后當(dāng)前線程會(huì)被放入AQS阻塞隊(duì)列,否則就去獲取。
3.2.2void acquire(int permits)方法
該方法與acquire()方法不同,后者只需要獲取一個(gè)信號(hào)量值,而前者則獲取permits個(gè)。
3.2.3void acquireUninterruptibly()方法
該方法與acquire()類似,不同之處在于該方法對(duì)中斷不響應(yīng),也就是當(dāng)當(dāng)前線程調(diào)用了acquireUninterruptibly獲取資源時(shí)(包含被阻塞后),其他線程調(diào)用了當(dāng)前線程的interrupt()方法設(shè)置了當(dāng)前線程的中斷標(biāo)志,此時(shí)當(dāng)前線程并不會(huì)拋出InterruptedException異常而返回。
3.2.4void acquireUninterruptibly(intpermits)方法
該方法與acquire(intpermits)方法的不同之處在于,該方法對(duì)中斷不響應(yīng)。
3.2.5void release()方法
該方法的作用是把當(dāng)前Semaphore對(duì)象的信號(hào)量值增加1,如果當(dāng)前有線程因?yàn)檎{(diào)用aquire方法被阻塞而被放入了AQS的阻塞隊(duì)列,則會(huì)根據(jù)公平策略選擇一個(gè)信號(hào)量個(gè)數(shù)能被滿足的線程進(jìn)行激活,激活的線程會(huì)嘗試獲取剛增加的信號(hào)量,下面看代碼實(shí)現(xiàn)。
public void release() {
//(1)arg=1
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//(2)嘗試釋放資源
if (tryReleaseShared(arg)) {
//(3)資源釋放成功則調(diào)用park方法喚醒AQS隊(duì)列里面最先掛起的線程
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//(4)獲取當(dāng)前信號(hào)量
int current = getState();
//(5)將當(dāng)前信號(hào)量+releases,這里加1
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//(6)使用CAS保證更新信號(hào)量的原子性
if (compareAndSetState(current, next))
return true;
}
}
由代碼release()->sync.releaseShared(1)可知,release方法每次只會(huì)對(duì)信號(hào)量值增加1,tryReleaseShared方法是無限循環(huán),使用CAS保證了release方法對(duì)信號(hào)量遞增1的原子性操作。tryReleaseShared方法增加信號(hào)量值成功后會(huì)執(zhí)行代碼(3),即調(diào)用AQS的方法來激活因?yàn)檎{(diào)用aquire方法而被阻塞的線程。
3.3小結(jié)
本節(jié)首先通過案例介紹了Semaphore的使用方法.然后介紹了Semaphore的源碼實(shí)現(xiàn),Semaphore也是使用AQS實(shí)現(xiàn)的,并且獲取信號(hào)量時(shí)有公平策略和非公平策略之分。文章來源:http://www.zghlxwxcb.cn/news/detail-423740.html
總結(jié)
本文介紹了并發(fā)包中關(guān)于線程協(xié)作的一些重要類。文章來源地址http://www.zghlxwxcb.cn/news/detail-423740.html
- 首先CountDownLatch通過計(jì)數(shù)器提供了更靈活的控制,只要檢測(cè)到計(jì)數(shù)器值為0,就可以往下執(zhí)行,這相比使用join必須等待線程執(zhí)行完畢后主線程才會(huì)繼續(xù)向下運(yùn)行更靈活。
- 另外,CyclicBarrier也可以達(dá)到CountDownLatch的效果,但是后者在計(jì)數(shù)器值變?yōu)?后,就不能再被復(fù)用,而前者則可以使用reset方法重置后復(fù)用,前者對(duì)同一個(gè)算法但是輸入?yún)?shù)不同的類似場(chǎng)景比較適用。
- 最后介紹了Semaphore的使用方法.然后介紹了Semaphore的源碼實(shí)現(xiàn),Semaphore也是使用AQS實(shí)現(xiàn)的,并且獲取信號(hào)量時(shí)有公平策略和非公平策略之分。
- 使用本章介紹的類會(huì)大大減少你在Java中使用wait、notify等來實(shí)現(xiàn)線程同步的代碼量,在日常開發(fā)中當(dāng)需要進(jìn)行線程同步時(shí)使用這些同步類會(huì)節(jié)省很多代碼并且可以保證正確性。
到了這里,關(guān)于Java中的并發(fā)工具類的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!