国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Java中的并發(fā)工具類

這篇具有很好參考價(jià)值的文章主要介紹了Java中的并發(fā)工具類。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

系列文章目錄

JUC篇:volatile可見性的實(shí)現(xiàn)原理
JUC篇:synchronized的應(yīng)用和實(shí)現(xiàn)原理
JUC篇:用Java實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線程池
JUC篇:java中的線程池
JUC篇:ThreadLocal的應(yīng)用與原理



前言

在JDK的并發(fā)包里提供了幾個(gè)非常有用的并發(fā)工具類。CountDownLatch、CyclicBarrier和
Semaphore工具類提供了一種并發(fā)流程控制的手段。本文會(huì)配合一些應(yīng)用場(chǎng)景來介紹如何使用這些工具類并解析其中的原理。


一、等待多線程完成的CountDownLatch

CountDownLatch允許一個(gè)或多個(gè)線程等待其他線程完成操作。

在日常開發(fā)中經(jīng)常會(huì)遇到需要在主線程中開啟多個(gè)線程去并行執(zhí)行任務(wù),并且主線程需要等待所有子線程執(zhí)行完畢后再進(jìn)行匯總的場(chǎng)景。在CountDownLatch出現(xiàn)之前一般都使用線程的join()方法來實(shí)現(xiàn)這一點(diǎn),但是join方法不夠靈活,不能夠滿足不同場(chǎng)景的需要,所以JDK開發(fā)組提供了CountDownLatch這個(gè)類

1.1案例介紹

假如有這樣一個(gè)需求:我們需要解析一個(gè)Excel里多個(gè)sheet的數(shù)據(jù),此時(shí)可以考慮使用多線程,每個(gè)線程解析一個(gè)sheet里的數(shù)據(jù),等到所有的sheet都解析完之后,程序需要提示解析完成。在這個(gè)需求中,要實(shí)現(xiàn)主線程等待所有線程完成sheet的解析操作,最簡(jiǎn)單的做法是使用join()方法

//用join()實(shí)現(xiàn)主線程等待全部子線程執(zhí)行完之后再執(zhí)行
public class JoinCountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(()->{
            System.out.println("線程1....start");
        },"線程1");

        Thread thread2 = new Thread(() -> {
            System.out.println("線程2.。。。start");
        }, "線程2");

        thread1.start();
        thread2.start();

        thread1.join();
        thread2.join();
        System.out.println("主線程。。。。。start");
    }
}
//用CountDownLatch實(shí)現(xiàn)主線程等待全部子線程執(zhí)行完之后再執(zhí)行
public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AtomicInteger res= new AtomicInteger();
        Thread thread1 = new Thread(()->{
            System.out.println("線程1....start");
            for (int i=0;i<10000;i++){
                res.incrementAndGet();
            }
            countDownLatch.countDown();
        },"線程1");

        Thread thread2 = new Thread(() -> {
            System.out.println("線程2.。。。start");
            for (int i=0;i<10000;i++){
                res.incrementAndGet();
            }
            countDownLatch.countDown();
        }, "線程2");

        thread1.start();
        thread2.start();

        countDownLatch.await();
        System.out.println("主線程。。。。。start"+res);

    }
}

總結(jié)CountDownLatch與join方法的區(qū)別

  • 一個(gè)區(qū)別是,調(diào)用一個(gè)子線程的join()方法后,該線程會(huì)一直被阻塞直到子線程運(yùn)行完畢,而CountDownLatch則使用計(jì)數(shù)器來允許子線程運(yùn)行完畢或者在運(yùn)行中遞減計(jì)數(shù),也就是CountDownLatch可以在子線程運(yùn)行的任何時(shí)候讓await方法返回而不一定必須等到線程結(jié)束。
  • 另外,使用線程池來管理線程時(shí)一般都是直接添加Runable到線程池,這時(shí)候就沒有辦法再調(diào)用線程的join方法了,就是說countDownLatch相比join方法讓我們對(duì)線程同步有更靈活的控制。

1.2實(shí)現(xiàn)原理

CountDownLatch的名字就可以猜測(cè)其內(nèi)部應(yīng)該有個(gè)計(jì)數(shù)器,并且這個(gè)計(jì)數(shù)器是遞減的。下面就通過源碼看看JDK開發(fā)組在何時(shí)初始化計(jì)數(shù)器,在何時(shí)遞減計(jì)數(shù)器,當(dāng)計(jì)數(shù)器變?yōu)?日才做了什么操作,多個(gè)線程是如何通過計(jì)時(shí)器值實(shí)現(xiàn)同步的。為了一覽CountDownLatch的內(nèi)部結(jié)構(gòu),我們先看它的類圖

Java中的并發(fā)工具類

從類圖可以看出,CountDownLatch是使用AQS實(shí)現(xiàn)的。通過下面的構(gòu)造函數(shù),你會(huì)發(fā)現(xiàn),實(shí)際上是把計(jì)數(shù)器的值賦給了AQS的狀態(tài)變量state,也就是這里使用AQS的狀態(tài)值來表示計(jì)數(shù)器值。

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

Sync(int count) {
            setState(count);
        }

下面我們來研究CountDownLatch中的幾個(gè)重要的方法,看它們是如何調(diào)用AQS來實(shí)現(xiàn)功能的。

1.2.1void await()方法

當(dāng)線程調(diào)用CountDownLatch對(duì)象的await方法后,當(dāng)前線程會(huì)被阻塞,直到下面的情況之一發(fā)生才會(huì)返回

  • 當(dāng)所有線程都調(diào)用了CountDownLatch對(duì)象的countDown方法后,也就是計(jì)數(shù)器的值為0時(shí)

  • 其他線程調(diào)用了當(dāng)前線程的interrupt()方法中斷了當(dāng)前線程,當(dāng)前線程就會(huì)拋出InterruptedException異常,然后返回

下面看下在await()方法內(nèi)部是如何調(diào)用AQS的方法的。

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

從以上代碼可以看到,await()方法委托sync調(diào)用了AQS的acquiresharedlnterruptibIy方法,后者的代碼如下:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    	//線程被中斷拋出異常
        if (Thread.interrupted())
            throw new InterruptedException();
    	//查看計(jì)數(shù)器值是否為0,為0直接返回,不為0進(jìn)入AQS的隊(duì)列等待
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

由如上代碼可知,該方法的特點(diǎn)是線程獲取資源時(shí)可以被中斷,并且獲取的資源是共享資源。acquireSharedInterruptibly首先判斷當(dāng)前線程是否己被中斷,若是則拋出異常,否則調(diào)用sync實(shí)現(xiàn)的tryAcquireShared方法查看當(dāng)前狀態(tài)值(計(jì)數(shù)器值)是否為0,是則
當(dāng)前線程的await()方法直接返回,否則調(diào)用AQS的doAcquireSharedlnterruptibly方法讓當(dāng)前線程阻塞。

另外可以看到,這里tryAcquireShared傳遞的arg參數(shù)沒有被用到,調(diào)用tryAcquireShared的方法僅僅是為了檢查當(dāng)前狀態(tài)值是不是為0,并沒有調(diào)用CAS讓當(dāng)前狀態(tài)值減1

1.2.2.void countDown()方法

線程調(diào)用該方法后,計(jì)數(shù)器的值遞減,遞減后如果計(jì)數(shù)器值為0則喚醒所有因調(diào)用await方法而被阻塞的線程,否則什么都不做。下面看下countDown()方法是如何調(diào)用AQS的方法的。

public void countDown() {
    	//委托sync調(diào)用AQS的方法
        sync.releaseShared(1);
    }

由如上代碼可知,CountDownLatch的countDown()方法委托sync調(diào)用了AQS的releaseShared方法,后者的代碼如下。

public final boolean releaseShared(int arg) {
    	//調(diào)用sync實(shí)現(xiàn)的tryReleaseShared
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

在如上代碼中,releaseShared首先調(diào)用了sync實(shí)現(xiàn)的AQS的tryReleaseShared方法,其代碼如下。

protected boolean tryReleaseShared(int releases) {
            //循環(huán)進(jìn)行CAS,直到當(dāng)前線程成功完成CAS使計(jì)數(shù)器值(狀態(tài)值state)減l并更新圭1]state

            for (;;) {
                int c = getState();
                //如果當(dāng)前狀態(tài)值為0,直接返回(1)
                if (c == 0)
                    return false;
                int nextc = c-1;
                //使用CAS讓計(jì)數(shù)器減1(2)
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

如上代碼首先獲取當(dāng)前狀態(tài)值(計(jì)數(shù)器值)。代碼(1)判斷如果當(dāng)前狀態(tài)值為0則直接返回false,從而countDown()方法直接返回:否則執(zhí)行代碼(2)使用CAS將計(jì)數(shù)器值減1,CAS失敗則循環(huán)重試,否則如果當(dāng)前計(jì)數(shù)器值為0則返回true,返回true說明是最后一個(gè)線程調(diào)用的countdown方法,那么該線程除了讓計(jì)數(shù)器值減1外,還需要喚醒因調(diào)用CountDownLatch的await方法而被阻塞的線程,具體是調(diào)用AQS的doReleaseShared方法來激活阻塞的線程。這里代碼(1)貌似是多余的,其實(shí)不然,之所以添加代碼(1)是為了防止當(dāng)計(jì)數(shù)器值為0后,其他線程又調(diào)用了countDown方法,如果沒有代碼(1)狀態(tài)值就可能會(huì)變成負(fù)數(shù)。

1.3小結(jié)

本節(jié)首先介紹了CountDownLatch的使用,相比使用join方法來實(shí)現(xiàn)線程間同步,前者更具有靈活性和方便性。

另外還介紹了CountDownLatch的原理,CountDownLatch是使用AQS實(shí)現(xiàn)的。使用AQS的狀態(tài)變量來存放計(jì)數(shù)器的值。
首先在初始化CountDownLatch時(shí)設(shè)置狀態(tài)值(計(jì)數(shù)器值),當(dāng)多個(gè)線程調(diào)用countdown方法時(shí)實(shí)際是原子性遞減AQS的狀態(tài)值。當(dāng)線程調(diào)用await方法后當(dāng)前線程會(huì)被放入AQS的阻塞隊(duì)列等待計(jì)數(shù)器為0再返回。其他線程調(diào)用countdown方法讓計(jì)數(shù)器值遞減1,當(dāng)計(jì)數(shù)器值變?yōu)?時(shí),當(dāng)前線程還要調(diào)用AQS的doReleaseShared方法來激活由于調(diào)用await()方法而被阻塞的線程。

二、同步屏障CyclicBarrier

CountDownLatch在解決多個(gè)線程同步方面相對(duì)于調(diào)用線程的join方法己經(jīng)有了不少優(yōu)化,但是CountDownLatch的計(jì)數(shù)器是一次性的,也就是等到計(jì)數(shù)器值變?yōu)?后,再調(diào)用CountDownLatch的await和countdown方法都會(huì)立刻返回,這就起不到線程同步的效果了。

所以為了滿足計(jì)數(shù)器可以重置的需要,JDK開發(fā)組提供了CyclicBarrier類,并且CyclicBarrier類的功能并不限于CountDownLatch的功能。從字面意思理解,CyclicBarrier是回環(huán)屏障的意思,它可以讓一組線程全部達(dá)到一個(gè)狀態(tài)后再全部同時(shí)執(zhí)行。這里之所以叫作回環(huán)是因?yàn)楫?dāng)所有等待線程執(zhí)行完畢,并重置CyclicBarrier的狀態(tài)后它可以被重用。之所以叫作屏障是因?yàn)榫€程調(diào)用await方法后就會(huì)被阻塞,這個(gè)阻塞點(diǎn)就稱為屏障點(diǎn),等所有線程都調(diào)用了await方法后,線程們就會(huì)沖破屏障,繼續(xù)向下運(yùn)行。

2.1案例介紹

假設(shè)一個(gè)任務(wù)由階段1、階段2和階段3組成,每個(gè)線程要串行地執(zhí)行階段1、階段2和階段3,當(dāng)多個(gè)線程執(zhí)行該任務(wù)時(shí)必須要保證所有線程的階段1全部完成后才能進(jìn)入階段2執(zhí)行,當(dāng)所有線程的階段2全部完成后才能進(jìn)入階段3執(zhí)行。下面使用CyclicBarrier來完成這個(gè)需求。

public class CyclicBarrierDemo1Test {
    //假設(shè)一個(gè)任務(wù)由階段1、階段2和階段3組成,每個(gè)線程要串行地執(zhí)行階段1、階段2和階段3,
    // 當(dāng)多個(gè)線程執(zhí)行該任務(wù)時(shí)必須要保證所有線程的階段1全部完成后才能進(jìn)入階段2執(zhí)行,當(dāng)所有線程的階段2全部完成后才能進(jìn)入階段3執(zhí)行。
    // 下面使用CyclicBarrier來完成這個(gè)需求。

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        //線程一
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName()+":step-1");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName()+":step-2");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName()+":step-3");
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        //線程二
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName()+":step-1");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName()+":step-2");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName()+":step-3");
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        //關(guān)閉線程池
        executorService.shutdown();


    }
}

輸出結(jié)果
Java中的并發(fā)工具類
在如上代碼中,每個(gè)子線程在執(zhí)行完階段1后都調(diào)用了await方法,等到所有線程都到達(dá)屏障點(diǎn)后才會(huì)一塊往下執(zhí)行,這就保證了所有線程都完成了階段1后才會(huì)開始執(zhí)行階段2。然后在階段2后面調(diào)用了await方法,這保證了所有線程都完成了階段2后,才能開始階段3的執(zhí)行。這個(gè)功能使用單個(gè)CountDownLatch是無法完成的。

2.2實(shí)現(xiàn)原理

CyclicBarrier的類圖結(jié)構(gòu)
Java中的并發(fā)工具類
由以上類圖可知,CyclicBrrier基于獨(dú)占鎖實(shí)現(xiàn)本質(zhì)底層還是基于AQS的。parties用來記錄線程個(gè)數(shù),這里表示多少線程調(diào)用await后,所有線程才會(huì)沖破屏障繼續(xù)往下運(yùn)行。而count一開始等于parties,每當(dāng)有線程調(diào)用await方法就遞減1,當(dāng)count為0時(shí)就表示所有線程都到了屏障點(diǎn)。

你可能會(huì)疑惑,為何維護(hù)parties和count兩個(gè)變量,只使用count不就可以了?

別忘了CyclicBrrier是可以被復(fù)用的,使用兩個(gè)變量的原因是,parties始終用來記錄總的線程個(gè)數(shù),當(dāng)count計(jì)數(shù)器值變?yōu)?后,會(huì)將parties的值賦給count,從而進(jìn)行復(fù)用。這兩個(gè)變量是在構(gòu)造CyclicBarrier對(duì)象時(shí)傳遞的,如下所示。

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

還有一個(gè)變量barrierCommand也通過構(gòu)造函數(shù)傳遞,這是一個(gè)任務(wù),這個(gè)任務(wù)的執(zhí)行時(shí)機(jī)是當(dāng)所有線程都到達(dá)屏障點(diǎn)后。

使用lock首先保證了更新計(jì)數(shù)器count的原子性。

另外使用lock的條件變量trip支持線程間使用await和signal操作進(jìn)行同步。

最后,在變量generation內(nèi)部有一個(gè)變量broken,其用來記錄當(dāng)前屏障是否被打破。

注意,這里的broken并沒有被聲明為volatile的,因?yàn)槭窃阪i內(nèi)使用變量,所以不需要聲明。

 private static class Generation {
        boolean broken = false;
    }

下面來看CyclicBarrier中的幾個(gè)重要的方法。

2.2.1int await()方法

當(dāng)前線程調(diào)用CyclicBarrier的該方法時(shí)會(huì)被阻塞,直到滿足下面條件之一才會(huì)返回:

  • parties個(gè)線程都調(diào)用了await()方法,也就是線程都到了屏障點(diǎn);

  • 其他線程調(diào)用了當(dāng)前線程的interrupt()方法中斷了當(dāng)前線程,則當(dāng)前線程會(huì)拋出InterruptedException異常而返回;

  • 與當(dāng)前屏障點(diǎn)關(guān)聯(lián)的Generation對(duì)象的broken標(biāo)志被設(shè)置為true時(shí),會(huì)拋出BrokenBarrierException異常,然后返回。

由如下代碼可知,在內(nèi)部調(diào)用了dowait方法。第一個(gè)參數(shù)為false則說明不設(shè)置超時(shí)時(shí)間,這時(shí)候第二個(gè)參數(shù)沒有意義。

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

2.2.2int dowait(boolean timed,long nanos)方法

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            //如果index==O則說明所有線程都到了屏障點(diǎn),此時(shí)執(zhí)行初始化時(shí)傳遞的任務(wù)
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    //(2)執(zhí)行任務(wù)
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //(3)激活其他因調(diào)用await方法而被阻塞的線程,并重置CyclicBarrier
                    nextGeneration();
                    //返回
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // (4)如果index!=0
            
            for (;;) {
                try {
                    //(5)沒有設(shè)置超時(shí)時(shí)間
                    if (!timed)
                        trip.await();
                    //(6)設(shè)置了超時(shí)
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }


 private void nextGeneration() {
        // (7)喚醒等待隊(duì)列里面阻塞的線程
        trip.signalAll();
        // (8)重置CyclicBarrier
        count = parties;
        generation = new Generation();
    }

以上是dowait方法的主干代碼。當(dāng)一個(gè)線程調(diào)用了dowait方法后,首先會(huì)獲取獨(dú)占鎖lock,如果創(chuàng)建CycleBarrier時(shí)傳遞的參數(shù)為10,那么后面9個(gè)調(diào)用錢程會(huì)被阻塞。然后當(dāng)前獲取到鎖的線程會(huì)對(duì)計(jì)數(shù)器count進(jìn)行遞減操作,遞減后count=index=9,因?yàn)?br> index!=0所以當(dāng)前線程會(huì)執(zhí)行代碼(4)。

如果當(dāng)前線程調(diào)用的是無參數(shù)的await()方法,則這里timed=false,所以當(dāng)前線程會(huì)被放入條件變量的p的條件阻塞隊(duì)列,當(dāng)前線程會(huì)被掛起并釋放獲取的lock鎖。如果調(diào)用的是有參數(shù)的await方法則timed=true,然后當(dāng)前線程也會(huì)被放入條件變量的條件隊(duì)列并釋放鎖資源,不同的是當(dāng)前線程會(huì)在指定時(shí)間超時(shí)后自動(dòng)被激活。

當(dāng)?shù)谝粋€(gè)獲取鎖的線程由于被阻塞釋放鎖后,被阻塞的9個(gè)線程中有一個(gè)會(huì)競(jìng)爭(zhēng)到lock鎖,然后執(zhí)行與第一個(gè)線程同樣的操作,直到最后一個(gè)線程獲取到lock鎖,此時(shí)己經(jīng)有9個(gè)線程被放入了條件變量trip的條件隊(duì)列里面。最后count=index等于0,所以執(zhí)行
代碼(2),如果創(chuàng)建CyclicBarrier時(shí)傳遞了任務(wù),則在其他線程被喚醒前先執(zhí)行任務(wù),任務(wù)執(zhí)行完畢后再執(zhí)行代碼(3),喚醒其他9個(gè)線程,并重置CyclicBarrier,然后這10個(gè)線程就可以繼續(xù)向下運(yùn)行了。

2.3小結(jié)

本節(jié)首先通過案例說明了CycleBarrier與CountDownLatch的不同在于,前者是可以復(fù)用的,并且前者特別適合分段任務(wù)有序執(zhí)行的場(chǎng)景。然后分析了CycleBarrier,其通過獨(dú)占鎖ReentrantLock實(shí)現(xiàn)計(jì)數(shù)器原子性更新,并使用條件變量隊(duì)列來實(shí)現(xiàn)線程同步。

三.控制并發(fā)線程數(shù)的Semaphore

Semaphore(信號(hào)量)是用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源。

3.1案例介紹

Semaphore可以用于做流量控制,特別是公用資源有限的應(yīng)用場(chǎng)景,比如數(shù)據(jù)庫(kù)連接。假如有一個(gè)需求,要讀取幾萬個(gè)文件的數(shù)據(jù),因?yàn)槎际荌O密集型任務(wù),我們可以啟動(dòng)幾十個(gè)線程并發(fā)地讀取,但是如果讀到內(nèi)存后,還需要存儲(chǔ)到數(shù)據(jù)庫(kù)中,而數(shù)據(jù)庫(kù)的連接數(shù)只有10個(gè),這時(shí)我們必須控制只有10個(gè)線程同時(shí)獲取數(shù)據(jù)庫(kù)連接保存數(shù)據(jù),否則會(huì)報(bào)錯(cuò)無法獲取數(shù)據(jù)庫(kù)連接。這個(gè)時(shí)候,就可以使用Semaphore來做流量控制

public class SemaphoreTest {
    private static final int THREAD_COUNT=30;

    private static final ExecutorService THREAD_POOL= Executors.newFixedThreadPool(THREAD_COUNT);

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(10);
        for (int i = 0; i <THREAD_COUNT ; i++) {
            THREAD_POOL.execute(()->{
                try {
                    semaphore.acquire();
                    System.out.println("save data");
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

        }
        THREAD_POOL.shutdown();
    }
}

3.2實(shí)現(xiàn)原理

Semaphore的類圖
Java中的并發(fā)工具類
由該類圖可知,Semaphore還是使用AQS實(shí)現(xiàn)的。Sync只是對(duì)AQS的一個(gè)修飾,并且Sync有兩個(gè)實(shí)現(xiàn)類,用來指定獲取信號(hào)量時(shí)是否采用公平策略。例如,下面的代碼在創(chuàng)建Semaphore時(shí)會(huì)使用一個(gè)變量指定是否使用公平策略。

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

 Sync(int permits) {
            setState(permits);
        }

在如上代碼中,Semaphore默認(rèn)采用非公平策略,如果需要使用公平策略則可以使用帶兩個(gè)參數(shù)的構(gòu)造函數(shù)來構(gòu)造Semaphore對(duì)象。另外,如CountDownLatch構(gòu)造函數(shù)傳遞的初始化信號(hào)量個(gè)數(shù)permits被賦給了AQS的state狀態(tài)變量一樣,這里AQS的state值也
表示當(dāng)前持有的信號(hào)量個(gè)數(shù)。

下面來看Semaphore實(shí)現(xiàn)的主要方法。

3.2.1void acquire()方法

當(dāng)前線程調(diào)用該方法的目的是希望獲取一個(gè)信號(hào)量資源。如果當(dāng)前信號(hào)量個(gè)數(shù)大于o,則當(dāng)前信號(hào)量的計(jì)數(shù)會(huì)減1,然后該方法直接返回。否則如果當(dāng)前信號(hào)量個(gè)數(shù)等于0,則當(dāng)前線程會(huì)被放入AQS的阻塞隊(duì)列。當(dāng)其他線程調(diào)用了當(dāng)前線程的interrupt()方法中
斷了當(dāng)前線程時(shí),則當(dāng)前線程會(huì)拋出InterruptedException異常返回。下面看下代碼實(shí)現(xiàn)。

public void acquire() throws InterruptedException {
    	//傳遞參數(shù)為1,說明要獲取1個(gè)信號(hào)量資源
        sync.acquireSharedInterruptibly(1);
    }

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    	//(1)線程被中斷,拋出異常
        if (Thread.interrupted())
            throw new InterruptedException();
    	//(2)否則調(diào)用Sync子類方法嘗試獲取,這里根據(jù)構(gòu)造函數(shù)確定使用公平策略
        if (tryAcquireShared(arg) < 0)
            //如果獲取失敗則放入阻塞隊(duì)列。然后再次嘗試,如果失敗則調(diào)用park方法掛起當(dāng)前線
            doAcquireSharedInterruptibly(arg);
    }

由如上代碼可知,acquire()在內(nèi)部調(diào)用了Sync的acquireSharedlnterruptibly方法,后者會(huì)對(duì)中斷進(jìn)行響應(yīng)(如果當(dāng)前線程被中斷,則拋出中斷異常)。嘗試獲取信號(hào)量資源的AQS的方法tryAcquireShared是由Sync的子類實(shí)現(xiàn)的,所以這里分別從兩方面來討論。
先討論非公平策略NonfairSync類的t叩A(chǔ)cquireShared方法,代碼如下

protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }


final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                //獲取當(dāng)前信號(hào)量
                int available = getState();
                //計(jì)算當(dāng)前剩余值
                int remaining = available - acquires;
                ///如果當(dāng)前剩余位小于0或者CAS設(shè)置成功則返回
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

如上代碼先獲取當(dāng)前信號(hào)量值(available),然后減去需要獲取的值(acquires),得到剩余的信號(hào)量個(gè)數(shù)(remaining),如果剩余值小于0則說明當(dāng)前信號(hào)量個(gè)數(shù)滿足不了需求,那么直接返回負(fù)數(shù),這時(shí)當(dāng)前線程會(huì)被放入AQS的阻塞隊(duì)列而被掛起。如果剩余值大于0,則使用CAS操作設(shè)置當(dāng)前信號(hào)量值為剩余值,然后返回剩余值。

另外,由于NonFairSync是非公平獲取的,也就是說先調(diào)用aquire方法獲取信號(hào)量的線程不一定比后來者先獲取到信號(hào)量??紤]下面場(chǎng)景,如果線程A先調(diào)用了aquire()方法獲取信號(hào)量,但是當(dāng)前信號(hào)量個(gè)數(shù)為0,那么線程A會(huì)被放入AQS的阻塞隊(duì)列。過一
段時(shí)間后線程C調(diào)用了release()方法釋放了一個(gè)信號(hào)量,如果當(dāng)前沒有其他線程獲取信號(hào)量,那么線程A就會(huì)被激活,然后獲取該信號(hào)量,但是假如線程C釋放信號(hào)量后,線程C調(diào)用了aquire方法,那么線程C就會(huì)和線程A去競(jìng)爭(zhēng)這個(gè)信號(hào)量資源。如果采用非公平策略,由nonfairTryAcquireShared的代碼可知,線程C完全可以在線程A被激活前,或者激活后先于線程A獲取到該信號(hào)量,也就是在這種模式下阻塞線程和當(dāng)前請(qǐng)求的線程是競(jìng)爭(zhēng)關(guān)系,而不遵循先來先得的策略。下面看公平性的FairSync類是如何保證公平性的。

protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

可見公平性還是靠hasQueuedPredecessors這個(gè)函數(shù)來保證的。前面章節(jié)講過,公平策略是看當(dāng)前線程節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是否也在等待獲取該資源,如果是則自己放棄獲取的權(quán)限,然后當(dāng)前線程會(huì)被放入AQS阻塞隊(duì)列,否則就去獲取。

3.2.2void acquire(int permits)方法

該方法與acquire()方法不同,后者只需要獲取一個(gè)信號(hào)量值,而前者則獲取permits個(gè)。

3.2.3void acquireUninterruptibly()方法

該方法與acquire()類似,不同之處在于該方法對(duì)中斷不響應(yīng),也就是當(dāng)當(dāng)前線程調(diào)用了acquireUninterruptibly獲取資源時(shí)(包含被阻塞后),其他線程調(diào)用了當(dāng)前線程的interrupt()方法設(shè)置了當(dāng)前線程的中斷標(biāo)志,此時(shí)當(dāng)前線程并不會(huì)拋出InterruptedException異常而返回。

3.2.4void acquireUninterruptibly(intpermits)方法

該方法與acquire(intpermits)方法的不同之處在于,該方法對(duì)中斷不響應(yīng)。

3.2.5void release()方法

該方法的作用是把當(dāng)前Semaphore對(duì)象的信號(hào)量值增加1,如果當(dāng)前有線程因?yàn)檎{(diào)用aquire方法被阻塞而被放入了AQS的阻塞隊(duì)列,則會(huì)根據(jù)公平策略選擇一個(gè)信號(hào)量個(gè)數(shù)能被滿足的線程進(jìn)行激活,激活的線程會(huì)嘗試獲取剛增加的信號(hào)量,下面看代碼實(shí)現(xiàn)。

public void release() {
    	//(1)arg=1
        sync.releaseShared(1);
    }

public final boolean releaseShared(int arg) {
    	//(2)嘗試釋放資源
        if (tryReleaseShared(arg)) {
            //(3)資源釋放成功則調(diào)用park方法喚醒AQS隊(duì)列里面最先掛起的線程
            doReleaseShared();
            return true;
        }
        return false;
    }

protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                //(4)獲取當(dāng)前信號(hào)量
                int current = getState();
                //(5)將當(dāng)前信號(hào)量+releases,這里加1
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                //(6)使用CAS保證更新信號(hào)量的原子性
                if (compareAndSetState(current, next))
                    return true;
            }
        }



由代碼release()->sync.releaseShared(1)可知,release方法每次只會(huì)對(duì)信號(hào)量值增加1,tryReleaseShared方法是無限循環(huán),使用CAS保證了release方法對(duì)信號(hào)量遞增1的原子性操作。tryReleaseShared方法增加信號(hào)量值成功后會(huì)執(zhí)行代碼(3),即調(diào)用AQS的方法來激活因?yàn)檎{(diào)用aquire方法而被阻塞的線程。

3.3小結(jié)

本節(jié)首先通過案例介紹了Semaphore的使用方法.然后介紹了Semaphore的源碼實(shí)現(xiàn),Semaphore也是使用AQS實(shí)現(xiàn)的,并且獲取信號(hào)量時(shí)有公平策略和非公平策略之分。

總結(jié)

本文介紹了并發(fā)包中關(guān)于線程協(xié)作的一些重要類。文章來源地址http://www.zghlxwxcb.cn/news/detail-423740.html

  • 首先CountDownLatch通過計(jì)數(shù)器提供了更靈活的控制,只要檢測(cè)到計(jì)數(shù)器值為0,就可以往下執(zhí)行,這相比使用join必須等待線程執(zhí)行完畢后主線程才會(huì)繼續(xù)向下運(yùn)行更靈活。
  • 另外,CyclicBarrier也可以達(dá)到CountDownLatch的效果,但是后者在計(jì)數(shù)器值變?yōu)?后,就不能再被復(fù)用,而前者則可以使用reset方法重置后復(fù)用,前者對(duì)同一個(gè)算法但是輸入?yún)?shù)不同的類似場(chǎng)景比較適用。
  • 最后介紹了Semaphore的使用方法.然后介紹了Semaphore的源碼實(shí)現(xiàn),Semaphore也是使用AQS實(shí)現(xiàn)的,并且獲取信號(hào)量時(shí)有公平策略和非公平策略之分。
  • 使用本章介紹的類會(huì)大大減少你在Java中使用wait、notify等來實(shí)現(xiàn)線程同步的代碼量,在日常開發(fā)中當(dāng)需要進(jìn)行線程同步時(shí)使用這些同步類會(huì)節(jié)省很多代碼并且可以保證正確性。

到了這里,關(guān)于Java中的并發(fā)工具類的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • java高并發(fā)系列 - 第34篇:google提供的一些好用的并發(fā)工具類

    java高并發(fā)系列第34篇。 環(huán)境:jdk1.8。 關(guān)于并發(fā)方面的,juc已幫我們提供了很多好用的工具,而谷歌在此基礎(chǔ)上做了擴(kuò)展,使并發(fā)編程更容易,這些工具放在guava.jar包中。 本文演示幾個(gè)簡(jiǎn)單的案例,見一下guava的效果。 需要先了解的一些技術(shù):juc中的線程池、Excecutors、Execu

    2024年02月16日
    瀏覽(52)
  • java高并發(fā)系列 - 第25天:掌握J(rèn)UC中的阻塞隊(duì)列

    這是java高并發(fā)系列第25篇文章。 環(huán)境:jdk1.8。 本文內(nèi)容 掌握Queue、BlockingQueue接口中常用的方法 介紹6中阻塞隊(duì)列,及相關(guān)場(chǎng)景示例 重點(diǎn)掌握4種常用的阻塞隊(duì)列 Queue接口 隊(duì)列是一種先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu),java中用Queue接口來表示隊(duì)列。 Queue接口中定義了6個(gè)方法:

    2024年02月14日
    瀏覽(58)
  • String類中的一些常用方法(JAVA)

    String類中的一些常用方法(JAVA)

    目錄 字符串比較方法: boolean equals(Object anObject): ?int compareTo(String s): int compareToIgnoreCase(String str) 字符串查找方法: char charAt(int index): int indexOf(int ch): ?int indexOf(int ch, int fromIndex): int indexOf(String str): int indexOf(String str, int fromIndex): int lastIndexOf(int ch): int lastIndexOf(in

    2024年02月07日
    瀏覽(19)
  • 【vim 學(xué)習(xí)系列文章 5 - cscope 過濾掉某些目錄】

    【vim 學(xué)習(xí)系列文章 5 - cscope 過濾掉某些目錄】

    上篇文章:【vim 學(xué)習(xí)系列文章 4 - vim與系統(tǒng)剪切板之間的交互】 下篇文章:【vim 學(xué)習(xí)系列文章 6 – vim 如何從上次退出的位置打開文件】 第一步 創(chuàng)建自己的 cscope 腳本 ~/.local/bin/cscope.sh ,如下: 我的這個(gè)腳本首先去區(qū)分當(dāng)前執(zhí)行 cscope 命令的目錄是 rt-thread 目錄還是 linux 目

    2024年02月12日
    瀏覽(40)
  • vscode 系列文章目錄 - ctrl+鼠標(biāo)左鍵無效

    vscode 中有時(shí)會(huì)遇到 “Alt + 鼠標(biāo)點(diǎn)擊” 有效,但 “Ctrl + 鼠標(biāo)點(diǎn)擊” 無效,這時(shí)可以通過 Ctrl + , 進(jìn)行系統(tǒng)配置。 進(jìn)入VScode的首選項(xiàng),選擇設(shè)置(快捷鍵 Ctrl + , ),輸入Go to definition,找到如下兩個(gè)設(shè)置。 Editor: Multi Cursor Modifier 設(shè)置成 alt “editor.gotoLocation.multipleDefinitions” 設(shè)置

    2024年04月23日
    瀏覽(24)
  • Java中的volatile關(guān)鍵字實(shí)現(xiàn)原理

    在并發(fā)編程中,線程之間的可見性問題是非常重要的一項(xiàng)難題。Java中提供了一種解決并發(fā)可見性問題的機(jī)制,即volatile。 在本文中,我們將會(huì)講解Java中volatile的實(shí)現(xiàn)原理,為什么它能夠保證可見性,以及背后的實(shí)現(xiàn)原理涉及到的內(nèi)存屏障和JVM屏障等內(nèi)容。在學(xué)習(xí)

    2023年04月27日
    瀏覽(21)
  • Java面試題:請(qǐng)談?wù)凧ava中的volatile關(guān)鍵字?

    在Java中,volatile是一種特殊的修飾符,用于確保多線程環(huán)境下的變量 可見性和順序性 。當(dāng)一個(gè)變量被聲明為volatile時(shí),它可以確保以下兩點(diǎn): 內(nèi)存可見性 :當(dāng)一個(gè)線程修改了一個(gè)volatile變量的值,其他線程會(huì)立即看到這個(gè)改變。這是因?yàn)関olatile會(huì)禁止CPU緩存和編

    2024年04月23日
    瀏覽(22)
  • Git系列文章目錄 - Git 子模塊git submodule使用

    項(xiàng)目中有時(shí)會(huì)遇到會(huì)涉及子模塊的使用,比如 flatpak 項(xiàng)目包含多個(gè)子模塊。 進(jìn)入需要添加子模塊的目錄,一般是項(xiàng)目根目錄。 刪除子模塊目錄及源碼: 刪除項(xiàng)目目錄下.gitmodules文件中子模塊相關(guān)條目: 刪除配置項(xiàng)中子模塊相關(guān)條目: 刪除模塊下的子模塊目錄: 清除子模塊

    2024年01月20日
    瀏覽(95)
  • AIGC系列文章目錄 第一章 AIGC 與AI對(duì)話,如何寫好prompt?

    AIGC系列文章目錄 第一章 AIGC 與AI對(duì)話,如何寫好prompt?

    生成式人工智能AIGC(Artificial Intelligence Generated Content)是人工智能1.0時(shí)代進(jìn)入2.0時(shí)代的重要標(biāo)志。 AIGC對(duì)于人類社會(huì)、人工智能的意義是里程碑式的。 短期來看 AIGC改變了基礎(chǔ)的生產(chǎn)力工具, 中期來看 會(huì)改變社會(huì)的生產(chǎn)關(guān)系, 長(zhǎng)期來看 促使整個(gè)社會(huì)生產(chǎn)力發(fā)生質(zhì)的突破,在

    2024年02月06日
    瀏覽(25)
  • JUC并發(fā)編程之volatile詳解

    JUC并發(fā)編程之volatile詳解

    目錄 ? 1. volatile 1.1?volatile的作用 1.1.1?變量可見性 1.1.2?禁止指令重排序 1.2 volatile可見性案例 1.3 volatile非原子性案例 1.4 volatile 禁止重排序 1.5 volatile 日常使用場(chǎng)景 送書活動(dòng) ? 在并發(fā)編程中,多線程操作共享的變量時(shí),可能會(huì)導(dǎo)致線程安全問題,如數(shù)據(jù)競(jìng)爭(zhēng)、可見性

    2024年02月14日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包