JDK并發(fā)包中常用并發(fā)工具類:
CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種并發(fā)流程控制的手段;
Exchanger工具類則提供了在線程間交換數(shù)據(jù)的一種手段。
等待多線程完成的CountDownLatch
CountDownLatch允許一個或多個線程等待其他線程完成操作。
需求:解析一個Excel里多個sheet的數(shù)據(jù),可以考慮使用多線程,每個線程解析一個sheet里的數(shù)據(jù),等到所有的sheet都解析完之后,程序需要提示解析完成。
實現(xiàn)主線程等待所有線程完成sheet的解析操作,最簡單的做法是使用join()方法
public class JoinCountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
Thread parser1 = new Thread(() -> {
});
Thread parser2 = new Thread(() -> System.out.println("parser2 finish"));
parser1.start();
parser2.start();
parser1.join();
parser2.join();
System.out.println("all parser finish");
}
}
join用于讓當(dāng)前執(zhí)行線程等待join線程執(zhí)行結(jié)束
。實現(xiàn)原理是不停檢查join線程是否存活,如果join線程存活則讓當(dāng)前線程永遠(yuǎn)等待。其中,wait(0)表示永遠(yuǎn)等待下去。代碼片段如下:
while (isAlive()) {
wait(0);
}
直到j(luò)oin線程中止后,線程的this.notifyAll()方法會被調(diào)用,調(diào)用notifyAll()方法是在JVM里實現(xiàn)的,在JDK里看不到。
CountDownLatch也可以實現(xiàn)join的功能,并且比join的功能更多
。
public class CountDownLatchTest {
static CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();
}).start();
c.await();
System.out.println("3");
}
}
CountDownLatch的構(gòu)造函數(shù)接收一個int類型的參數(shù)作為計數(shù)器,等待N個點完成,這里就傳入N。
調(diào)用CountDownLatch的countDown方法
時,N就會減1
,CountDownLatch的await方法會阻塞當(dāng)前線程,直到N變成零
。
countDown方法可以用在任何地方,N個點,可以是N個線程,也可以是1個線程里的N個執(zhí)行步驟。
用在多個線程時,只需要把這個CountDownLatch的引用傳遞到線程里即可。
如果有某個解析sheet的線程處理得比較慢,不可能讓主線程一直等待,可以使用另外一個帶指定時間的await方法——await(long time,TimeUnit unit),這個方法等待特定時間后,就會不再阻塞當(dāng)前線程
。join也有類似的方法。
同步屏障CyclicBarrier
讓一組線程到達(dá)一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達(dá)屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)運行。
默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties)
,其參數(shù)表示屏障攔截的線程數(shù)量,每個線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。
public class CyclicBarrierTest {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(() -> {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
}
主線程和子線程的調(diào)度是由CPU決定的,兩個線程都有可能先執(zhí)行。
輸出結(jié)果可能有兩種:
一種是:
另一種:
如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),主線程和子線程會永遠(yuǎn)等待,因為沒有第三個線程執(zhí)行await方法,即沒有第三個線程到達(dá)屏障,所以之前到達(dá)屏障的兩個線程都不會繼續(xù)執(zhí)行。
更高級的構(gòu)造函數(shù)CyclicBarrier(int parties,Runnable barrierAction)
,用于在線程到達(dá)屏障時,優(yōu)先執(zhí)行barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場景。
public class CyclicBarrierTest2 {
static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(() -> {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
static class A implements Runnable {
@Override
public void run() {
System.out.println(3);
}
}
}
初始值設(shè)為2,等代碼中的第一個線程和線程A都執(zhí)行完之后,才會繼續(xù)執(zhí)行主線程,然后輸出2。結(jié)果一定是:
CyclicBarrier的應(yīng)用場景
CyclicBarrier可以用于多線程計算數(shù)據(jù),最后合并計算結(jié)果的場景。
public class BankWaterService implements Runnable {
/**
* 創(chuàng)建4個屏障,處理完之后執(zhí)行當(dāng)前類的run方法
*/
private CyclicBarrier c = new CyclicBarrier(4, this);
/**
* 假設(shè)只有4個sheet,只啟動4個線程
*/
private final Executor executor = Executors.newFixedThreadPool(4);
/**
* 保存每個sheet計算出的銀流結(jié)果
*/
private final ConcurrentHashMap<String, Integer> sheetBankWaterCount = new
ConcurrentHashMap<>();
private void count() {
for (int i = 0; i < 4; i++) {
executor.execute(() -> {
// 計算當(dāng)前sheet的銀流數(shù)據(jù),計算代碼省略
sheetBankWaterCount
.put(Thread.currentThread().getName(), 1);
// 銀流計算完成,插入一個屏障
try {
c.await();
} catch (InterruptedException |
BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
@Override
public void run() {
int result = 0;
// 匯總每個sheet計算出的結(jié)果
for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
result += sheet.getValue();
}
// 將結(jié)果輸出
sheetBankWaterCount.put("result", result);
System.out.println(result);
}
public static void main(String[] args) {
BankWaterService bankWaterCount = new BankWaterService();
bankWaterCount.count();
}
}
計算銀行流水,一個sheet開啟一個線程,所有線程執(zhí)行完畢,將所有計算結(jié)果相加得銀行總流水。
最后輸出結(jié)果為4。
CyclicBarrier和CountDownLatch的區(qū)別
CountDownLatch的計數(shù)器只能使用一次
,而CyclicBarrier的計數(shù)器
可以使用reset()方法重置
。CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場景。例如,如果計算發(fā)生錯誤,可以重置計數(shù)器,并讓線程重新執(zhí)行一次。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得CyclicBarrier阻塞的線程數(shù)量
。isBroken()方法用來了解阻塞的線程是否被中斷
。
控制并發(fā)線程數(shù)的Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源。
應(yīng)用場景
Semaphore可以用于做流量控制
,特別是公用資源有限的應(yīng)用場景,比如數(shù)據(jù)庫連接。
需求:要讀取幾萬個文件的數(shù)據(jù),因為都是IO密集型任務(wù),可以啟動幾十個線程并發(fā)地讀取,但是如果讀到內(nèi)存后,還需要存儲到數(shù)據(jù)庫中,而數(shù)據(jù)庫的連接數(shù)只有10個,這時必須控制只有10個線程同時獲取數(shù)據(jù)庫連接保存數(shù)據(jù),否則會報錯無法獲取數(shù)據(jù)庫連接。此時就使用Semaphore來做流量控制。
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static final ExecutorService threadPool = Executors
.newFixedThreadPool(THREAD_COUNT);
private static final Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(() -> {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
}
});
}
threadPool.shutdown();
}
}
有30個線程在執(zhí)行,但是只允許10個并發(fā)執(zhí)行。
構(gòu)造方法Semaphore(int permits)接受一個整型的數(shù)字
,表示可用的許可證數(shù)量。Semaphore(10)表示允許10個線程獲取許可證,也就是最大并發(fā)數(shù)是10。Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()方法獲取一個許可證
,使用完之后調(diào)用release()方法歸還許可證
。還可以用tryAcquire()方法嘗試獲取許可證。
一些其他方法:
線程間交換數(shù)據(jù)的Exchanger
Exchanger(交換者)是一個用于線程間協(xié)作的工具類。
用于進(jìn)行線程間的數(shù)據(jù)交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數(shù)據(jù)。
兩個線程通過exchange方法交換數(shù)據(jù),如果第一個線程先執(zhí)行exchange()方法,它會一直等待第二個線程也執(zhí)行exchange方法,當(dāng)兩個線程都到達(dá)同步點時,這兩個線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對方。
Exchanger可以用于遺傳算法
。遺傳算法里需要選出兩個人作為交配對象,這時候會交換兩人的數(shù)據(jù),并使用交叉規(guī)則得出2個交配結(jié)果。
Exchanger也可以用于校對工作
。
需求:將紙制銀行流水通過人工的方式錄入成電子銀行流水,為了避免錯誤,采用AB崗兩人進(jìn)行錄入,錄入到Excel之后,系統(tǒng)需要加載這兩個Excel,并對兩個Excel數(shù)據(jù)進(jìn)行校對,看看是否錄入一致。
兩個線程間數(shù)據(jù)傳遞。
public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<>();
private static final ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(() -> {
try {
// A錄入銀行流水?dāng)?shù)據(jù)
String A = "銀行流水A";
String B = exgr.exchange(A);
System.out.println("B----- " + B);
} catch (InterruptedException e) {
}
});
threadPool.execute(() -> {
try {
// B錄入銀行流水?dāng)?shù)據(jù)
String B = "銀行流水B";
String A = exgr.exchange("B");
System.out.println("A和B數(shù)據(jù)是否一致:" + A.equals(B) + ",A錄入的是:"
+ A + ",B錄入是:" + B);
} catch (InterruptedException e) {
}
});
threadPool.shutdown();
}
}
輸出結(jié)果:
文章來源:http://www.zghlxwxcb.cn/news/detail-671972.html
如果兩個線程有一個沒有執(zhí)行exchange()方法,則會一直等待,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)設(shè)置最大等待時長
。文章來源地址http://www.zghlxwxcb.cn/news/detail-671972.html
到了這里,關(guān)于Java并發(fā)工具類的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!