前文了解了線程通信方式中的CountDownLatch, Condition,ReentrantLock以及CyclicBarrier,接下來我們繼續(xù)了解其他的線程間通信方式。
Phaser
Phaser是JDK1.7中引入的一種功能上和CycliBarrier和CountDownLatch相似的同步工具,相對(duì)這兩者而言其用法更加靈活,同時(shí)Phaser也支持重用。
在Phaser中將需要協(xié)作完成的任務(wù)分成多個(gè)階段,每個(gè)階段的參與者可指定,參與者可以隨時(shí)注冊(cè)并參與到某個(gè)階段或者取消參與本階段。以選修課考試為例,說明Phaser的工作邏輯,假設(shè)現(xiàn)有選修課3門,政治,歷史,地理,各選修人數(shù)分別為20,10,10.按Phaser實(shí)現(xiàn)考試邏輯如下:
- 第一階段考政治,總共應(yīng)有9名同學(xué)參加考試,在考試開始時(shí),8位同學(xué)開始答題,另外一位同學(xué)未到,考試中途,最后一位同學(xué)進(jìn)入,開始考試,所有同學(xué)答題完成后,政治考試結(jié)束
- 第二階段考?xì)v史,總共9名同學(xué)參考考試,在考試結(jié)束前,3名同學(xué)棄考,則實(shí)際參與考試有6名同學(xué),所有同學(xué)答題完成后,歷史考試結(jié)束
- 第三階段考地理,總共9名同學(xué)參與考試,中途無意外,所有同學(xué)答題完成后,地理考試結(jié)束
至此選修課考試的三個(gè)階段均完成,所以選修課考試這個(gè)任務(wù)結(jié)束,其中第一階段中晚到參考考試的同學(xué)說的就是參與者可以隨時(shí)注冊(cè)并參與到某個(gè)階段,第二階段中棄考的同學(xué)說的就是參與者可以隨時(shí)取消參與本階段,當(dāng)所有參與本階段的參與者均取消,則意味著該階段完成。
在Phaser中,針對(duì)一個(gè)階段而言,每一個(gè)參與者都被稱為一個(gè)party,可以通過構(gòu)造函數(shù)指定參與者數(shù)量,也可以通過register使parties(party的總和)自增,當(dāng)當(dāng)前階段的所有參與者等于parties的數(shù)量時(shí),此時(shí)phase自增1,進(jìn)入下一個(gè)階段,回調(diào)onAdvance方法
Phaser提供的核心函數(shù)如下所示:
函數(shù)名稱 | 描述 | 備注 |
---|---|---|
register() | 注冊(cè)一個(gè)party,使得parties+1 | / |
bulkRegister(int parties) | 批量注冊(cè)party,使得parties變?yōu)橐延袀€(gè)數(shù)與傳入?yún)?shù)之和 | / |
arriveAndDeregister() | 當(dāng)前任務(wù)已完成,使parties計(jì)數(shù)減1,不會(huì)形成阻塞 | / |
arriveAndAwaitAdvance() | 已達(dá)到執(zhí)行點(diǎn),線程阻塞,等待下一階段喚醒繼續(xù)執(zhí)行 | / |
awaitAdvance(int phase) | 參數(shù)是一個(gè)已完成的階段編號(hào),通常以已完成任務(wù)的arrive或者arriveAndDeregister函數(shù)的返回值作為取值,如果傳入?yún)?shù)的階段編號(hào)和當(dāng)前階段編號(hào)相同,則在此處等待,如果不同或者Phaser已經(jīng)是terminated狀態(tài),則立即返回 | / |
arrive() | 達(dá)到當(dāng)前階段,不等待其他參與者到達(dá) | / |
arriveAndAwaitAdvance
以上述政治考試為例,學(xué)習(xí)Phaser基本使用
public static void main(String[] args) {
// 創(chuàng)建Phaser
Phaser phaser = new Phaser(){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("政治考試完成");
break;
case 1:
System.out.println("歷史考試完成");
break;
case 2:
System.out.println("地理考試完成");
break;
}
// 如果到達(dá)某一階段,Phaser中參與者為0,則會(huì)銷毀該P(yáng)haser
return super.onAdvance(phase, registeredParties);
}
};
IntStream.range(1,10).forEach(number->{
phaser.register();
Thread student= new Thread(()->{
System.out.println("學(xué)生"+number+"arrive advance");
// 等待其他線程,此時(shí)block
phaser.arriveAndAwaitAdvance();
System.out.println("學(xué)生"+number+"政治開始答題");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("學(xué)生"+number+"政治交卷");
// 考試完成,取消計(jì)數(shù),參與者減1
phaser.arriveAndDeregister();
System.out.println("Phaser is terminated :" +phaser.isTerminated());
});
student.start();
});
System.out.println("Phaser is terminated :" +phaser.isTerminated());
}
輸出如下:
從上面可以看出,Phaser中通過arriveAndAwaitAdvance阻塞當(dāng)前線程,當(dāng)所有線程到達(dá)阻塞柵欄時(shí),喚醒等待線程繼續(xù)執(zhí)行,進(jìn)而達(dá)到線程間同步協(xié)作。
awaitAdvance
有時(shí)候,當(dāng)Phaser 在當(dāng)前階段結(jié)束時(shí),我們需要兜底做一些策略,比如說資源的釋放,狀態(tài)的檢查上報(bào)等,此時(shí)就需要用到awaitAdvance,awaitAdvance接受一個(gè)階段編號(hào),如果當(dāng)前階段編號(hào)和傳入的相等,則會(huì)進(jìn)入等待狀態(tài),等到所有參與者都到達(dá)該階段柵欄時(shí),被喚醒。實(shí)例代碼如下:
public static class ThreadA implements Runnable {
private Phaser phaser;
public ThreadA(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " start ");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " end " );
}
}
public static class ThreadB implements Runnable {
private Phaser phaser;
public ThreadB(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " start " );
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " end ");
}
}
public static class ThreadC implements Runnable {
private Phaser phaser;
public ThreadC(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " start ");
System.out.println(Thread.currentThread().getName() + " phaser.getPhase()=" + phaser.getPhase());
phaser.awaitAdvance(0);
System.out.println(Thread.currentThread().getName() + " end ");
}
}
public static class ThreadD implements Runnable {
private Phaser phaser;
public ThreadD(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " begin sleep");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " sleep completed ");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " end ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// 聲明Phaser
Phaser phaser = new Phaser(3) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("Phaser arrived at :"+phase);
return super.onAdvance(phase, registeredParties);
}
};
Thread t1 = new Thread(new ThreadA(phaser));
Thread t2 = new Thread(new ThreadB(phaser));
Thread t3 = new Thread(new ThreadC(phaser));
Thread t4 = new Thread(new ThreadD(phaser));
t1.setName("ThreadA");
t2.setName("ThreadB");
t3.setName("ThreadC");
t4.setName("ThreadD");
t1.start();
t2.start();
t3.start();
t4.start();
}
如上代碼所示,聲明Phaser有三個(gè)參與者ThreadA,ThreadB,ThreadD,在三個(gè)參與者都執(zhí)行到arriveAndAwaitAdvance之前,ThreadC 阻塞等待,當(dāng)三個(gè)參與者都執(zhí)行到arriveAndAwaitAdvance后,回調(diào)onAdvance方法,此時(shí)被阻塞的參與者被喚醒執(zhí)行,之后ThreadC被喚醒繼續(xù)執(zhí)行,運(yùn)行結(jié)果如下:
Exchanger
Exchanger用于兩個(gè)線程之間的通信,無論哪個(gè)線程先調(diào)用Exchanger,都會(huì)等待另外一個(gè)線程調(diào)用時(shí)進(jìn)行數(shù)據(jù)交換,示例代碼如下:
private static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" sleep start");
Thread.sleep(10000);
System.out.println(Thread.currentThread().getName()+" sleep end");
System.out.println(Thread.currentThread().getName()+" send data to Exchanger");
String aa = exchanger.exchange("data from Thread1");
System.out.println(Thread.currentThread().getName() + " "+aa);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Thread1").start();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" send data to Exchanger");
String bb = exchanger.exchange("data from Thread2");
System.out.println(Thread.currentThread().getName() + " "+bb);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Thread2").start();
}
運(yùn)行輸出如下:
總結(jié)
結(jié)合前文,我們一共學(xué)習(xí)了種線程間通信方式,主要有:文章來源:http://www.zghlxwxcb.cn/news/detail-426974.html
- Object.wait/Object.notify/Object.notifyAll + synchronized
- Semaphore(信號(hào)量)
- CountDownLatch
- CyclicBarrier
- Condition+ReentrantLock
- Phaser
- Exchanger
大家日常開發(fā)中可靈活使用,針對(duì)各通信方式比較見下表:文章來源地址http://www.zghlxwxcb.cn/news/detail-426974.html
通信方式 | 應(yīng)用場景 | 是否可重用 | 子任務(wù)異常處理 | 備注 |
---|---|---|---|---|
Object.wait/Object.notify/Object.notifyAll + synchronized | 大多數(shù)線程通信場景 | 是 | 依賴開發(fā)者維護(hù),在finally塊中完成釋放,避免死鎖 | / |
Semaphore(信號(hào)量) | 通知喚醒類線程間通信場景 | 是 | 依賴開發(fā)者維護(hù),在finally塊中釋放信號(hào)量,避免死鎖 | / |
CountDownLatch | 串行多線程運(yùn)行場景 | 否 | 不加處理的話,子任務(wù)發(fā)生異常導(dǎo)致退出,則所有等待的線程都會(huì)一致等待,直到超時(shí)時(shí)間來臨 | / |
CyclicBarrier | 聚合類線程通信場景 | 否 | 不加處理的話,如果在所有線程都到達(dá)屏障陷入阻塞前,如果有線程發(fā)生異常導(dǎo)致未到達(dá)柵欄提前退出,則所有等待在柵欄都會(huì)以BrokenBarrierException或InterruptedException異常退出 | / |
Condition+ReentrantLock | 大多數(shù)線程通信場景 | 是 | 依賴開發(fā)者維護(hù),在finally塊中完成釋放,避免死鎖 | / |
Phaser | 適用CountDownLatch與CyclicBarrier組合場景 | 是 | 依賴開發(fā)者維護(hù),在finally塊中取消參與者,避免死鎖 | / |
Exchanger | 線程間數(shù)據(jù)交換場景 | 是 | 依賴開發(fā)者維護(hù),確保兩個(gè)線程狀態(tài)正常,并行運(yùn)行 | / |
到了這里,關(guān)于Java線程間通信方式(3)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!