CountDownLatch 和 CyclicBarrier:如何讓多線程步調(diào)一致?
原始對(duì)賬系統(tǒng)
- 對(duì)賬系統(tǒng)的業(yè)務(wù)簡(jiǎn)化后:
- 首先用戶通過(guò)在線商城下單,會(huì)生成電子訂單,保存在訂單庫(kù);
- 之后物流會(huì)生成派送單給用戶發(fā)貨,派送單保存在派送單庫(kù)。
- 為了防止漏派送或者重復(fù)派送,對(duì)賬系統(tǒng)每天還會(huì)校驗(yàn)是否存在異常訂單。
- 目前對(duì)賬系統(tǒng)的處理邏輯是首先查詢訂單,然后查詢派送單,之后對(duì)比訂單和派送單,將差異寫入差異庫(kù)。
- 對(duì)賬系統(tǒng)的核心代碼如下,就是在一個(gè)單線程里面循環(huán)查詢訂單、派送單,然后執(zhí)行對(duì)賬,最后將寫入差異庫(kù)。
while (存在未對(duì)賬訂單) { // 查詢未對(duì)賬訂單 pos = getPOrders(); // 查詢派送單 dos = getDOrders(); // 執(zhí)?對(duì)賬操作 diff = check(pos, dos); // 差異寫?差異庫(kù) save(diff); }
利用并行優(yōu)化對(duì)賬系統(tǒng)
- 對(duì)于串行化的系統(tǒng),優(yōu)化性能首先想到的是能否利用多線程并行處理。
- 查詢未對(duì)賬訂單 getPOrders() 和查詢派送單 getDOrders() 是否可以并行處理呢?
- 顯然是可以的,因?yàn)檫@兩個(gè)操作并沒(méi)有先后順序的依賴。
while () { // 查詢未對(duì)賬訂單 Thread t1 = new Thread(() -> { pos = getPOrders(); }); t1.start(); // 查詢派送單 Thread t2 = new Thread(()->{ dos = getDOrders(); }); t2.start(); // 等待 t1、t2 結(jié)束 t1.join(); t2.join(); // 執(zhí)?對(duì)賬操作 diff = check(pos, dos); // 差異寫?差異庫(kù) save(diff); }
用 CountDownLatch 實(shí)現(xiàn)線程等待
- while 循環(huán)里面每次都會(huì)創(chuàng)建新的線程,而創(chuàng)建線程可是個(gè)耗時(shí)的操作。所以最好是創(chuàng)建出來(lái)的線程能夠循環(huán)利用,線程池就能解決這個(gè)問(wèn)題。
Executor executor = Executors.newFixedThreadPool(2); while (存在未對(duì)賬訂單) { // 計(jì)數(shù)器初始化為 2 CountDownLatch latch = new CountDownLatch(2); // 查詢未對(duì)賬訂單 executor.execute(() -> { pos = getPOrders(); latch.countDown(); }); // 查詢派送單 executor.execute(()-> { dos = getDOrders(); latch.countDown(); }); // 等待兩個(gè)查詢操作結(jié)束 latch.await(); // 執(zhí)?對(duì)賬操作 diff = check(pos, dos); // 差異寫?差異庫(kù) save(diff); }
- 我們將 getPOrders() 和 getDOrders() 這兩個(gè)查詢操作并行了,但這兩個(gè)查詢操作和對(duì)賬操作 check()、save() 之間還是串行的。很顯然,這兩個(gè)查詢操作和對(duì)賬操作也是可以并行的,也就是說(shuō),在執(zhí)行對(duì)賬操作的時(shí)候,可以同時(shí)去執(zhí)行下一輪的查詢操作。
- 針對(duì)對(duì)賬這個(gè)項(xiàng)目,我設(shè)計(jì)了兩個(gè)隊(duì)列,并且兩個(gè)隊(duì)列的元素之間還有對(duì)應(yīng)關(guān)系。
- 訂單查詢操作將訂單查詢結(jié)果插入訂單隊(duì)列,派送單查詢操作將派送單插入派送單隊(duì)列,這兩個(gè)隊(duì)列的元素之間是有對(duì)應(yīng)關(guān)系的。
- 兩個(gè)隊(duì)列的好處是,對(duì)賬操作可以每次從訂單隊(duì)列出一個(gè)元素,從派送單隊(duì)列出一個(gè)元素,然后對(duì)這兩個(gè)元素執(zhí)行對(duì)賬操作,這樣數(shù)據(jù)一定不會(huì)亂掉。
- ?個(gè)線程 T1 執(zhí)行訂單的查詢工作,一個(gè)線程 T2 執(zhí)行派送單的查詢工作,當(dāng)線程 T1 和 T2 都各自生產(chǎn)完 1 條數(shù)據(jù)的時(shí)候,通知線程 T3 執(zhí)行對(duì)賬操作。
- 線程 T1 和線程 T2 只有都生產(chǎn)完 1 條數(shù)據(jù)的時(shí)候,才能一起向下執(zhí)行,也就是說(shuō),線程 T1 和線程 T2 要互相等待,步調(diào)要一致。
- 同時(shí)當(dāng)線程 T1和 T2 都生產(chǎn)完一條數(shù)據(jù)的時(shí)候,還要能夠通知線程 T3 執(zhí)行對(duì)賬操作。
用 CyclicBarrier 實(shí)現(xiàn)線程同步
- 我們首先創(chuàng)建了一個(gè)計(jì)數(shù)器初始值為 2 的 CyclicBarrier,你需要注意的是創(chuàng)建 CyclicBarrier 的時(shí)候,我們還傳入了一個(gè)回調(diào)函數(shù),當(dāng)計(jì)數(shù)器減到 0 的時(shí)候,會(huì)調(diào)用這個(gè)回調(diào)函數(shù)。
- 線程 T1 負(fù)責(zé)查詢訂單,當(dāng)查出一條時(shí),調(diào)用 barrier.await() 來(lái)將計(jì)數(shù)器減 1,同時(shí)等待計(jì)數(shù)器變成 0;
- 線程 T2 負(fù)責(zé)查詢派送單,當(dāng)查出一條時(shí),也調(diào)用 barrier.await() 來(lái)將計(jì)數(shù)器減 1,同時(shí)等待計(jì)數(shù)器變成 0;
- 當(dāng) T1 和 T2 都調(diào)用 barrier.await() 的時(shí)候,計(jì)數(shù)器會(huì)減到 0,此時(shí) T1 和 T2 就可以執(zhí)行下?條語(yǔ)句了,同時(shí)會(huì)調(diào)用 barrier 的回調(diào)函數(shù)來(lái)執(zhí)行對(duì)賬操作。
- CyclicBarrier 的計(jì)數(shù)器有自動(dòng)重置的功能,當(dāng)減到 0 的時(shí)候,會(huì)自動(dòng)重置你設(shè)置的初始值。
// 訂單隊(duì)列 Vector<P> pos; // 派送單隊(duì)列 Vector<D> pos; // 執(zhí)?回調(diào)的線程池 Executor executor = Executors.newFixedThreadPool(1); final CyclicBarrier barrier = new CyclicBarrier(2, () -> { executor.execute(() -> check()); }); void check() { P p = pos.remove(0); D d = dos.remove(0); // 執(zhí)?對(duì)賬操作 diff = check(p, d); // 差異寫?差異庫(kù) save(diff); } void checkAll() { // 循環(huán)查詢訂單庫(kù) Thread t1 = new Thread(() -> { while (存在未對(duì)賬訂單) { // 查詢訂單庫(kù) pos.add(getPOrders()); // 等待 barrier.await(); } }); t1.start(); // 循環(huán)查詢運(yùn)單庫(kù) Thread T2 = new Thread(()->{ while(存在未對(duì)賬訂單){ // 查詢運(yùn)單庫(kù) dos.add(getDOrders()); // 等待 barrier.await(); } }); T2.start(); }
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-476429.html
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-476429.html
到了這里,關(guān)于《Java并發(fā)編程實(shí)戰(zhàn)》課程筆記(十二)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!