国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【Java 】從源碼全面解析Java 線程池

這篇具有很好參考價(jià)值的文章主要介紹了【Java 】從源碼全面解析Java 線程池。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一、引言

線程池技術(shù)在互聯(lián)網(wǎng)技術(shù)使用如此廣泛,幾乎所有的后端技術(shù)面試官都要在線程池技術(shù)的使用和原理方面對(duì)小伙伴們進(jìn)行 360° 的刁難。

作為一個(gè)在互聯(lián)網(wǎng)公司面一次拿一次 Offer 的面霸,打敗了無(wú)數(shù)競(jìng)爭(zhēng)對(duì)手,每次都只能看到無(wú)數(shù)落寞的身影失望的離開(kāi),略感愧疚(請(qǐng)?jiān)试S我使用一下夸張的修辭手法)。

希望能幫助各位讀者以后面試勢(shì)如破竹,對(duì)面試官進(jìn)行 360° 的反擊,吊打問(wèn)你的面試官,讓一同面試的同僚瞠目結(jié)舌,瘋狂收割大廠 Offer!

雖然現(xiàn)在是互聯(lián)網(wǎng)寒冬,但乾坤未定,你我皆是黑馬。

二、使用

我想大部分人應(yīng)該都使用過(guò)線程池,我們的 JDK 中也提供了一些包裝好的線程池使用,比如:

  • newFixedThreadPool:返回一個(gè)核心線程數(shù)為 nThreads 的線程池
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
  • newSingleThreadExecutor:返回一個(gè)核心線程數(shù)為 1 的線程池
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
  • newCachedThreadPool:大同小異
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

通過(guò)上面 JDK 提供的我們可以發(fā)現(xiàn)一個(gè)共識(shí),他們其實(shí)都是調(diào)用了 ThreadPoolExecutor 的構(gòu)造方法來(lái)進(jìn)行線程池的創(chuàng)建。

這時(shí)候,我們不免有疑問(wèn),我們難道不可以直接使用 ThreadPoolExecutor 的構(gòu)造方法去進(jìn)行創(chuàng)建嘛

是的,阿里巴巴Java開(kāi)發(fā)手冊(cè)中明確指出,『不允許』使用Executors創(chuàng)建線程池

所以,我們?cè)谏a(chǎn)中,一般使用 ThreadPoolExecutor 的構(gòu)造方法自定義去創(chuàng)建線程池,比如:

public class ThreadPoolTest {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,      // 核心線程數(shù)
                5,  // 最大線程數(shù)
                200,   // 非核心工作線程在阻塞隊(duì)列位置等待的時(shí)間
                TimeUnit.SECONDS,  // 非核心工作線程在阻塞隊(duì)列位置等待的單位
                new LinkedBlockingQueue<>(), // 阻塞隊(duì)列,存放任務(wù)的地方
                new ThreadPoolExecutor.AbortPolicy() // 拒絕策略:這里有四種
        );

        for (int i = 0; i < 10; i++) {
            MyTask task = new MyTask();
            executor.execute(task);
        }

        // 關(guān)閉線程
        executor.shutdown();

    }
}

class MyTask implements Runnable {
    @Override
    public void run() {
        System.out.println("我被執(zhí)行了....");
    }
}

三、源碼

整體的流程如下:
【Java 】從源碼全面解析Java 線程池

1、初始化

聊源碼不從初始化聊的,都是不講道理的

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

// 執(zhí)行ThreadPoolExecutor的構(gòu)造方法進(jìn)行初始化
// corePoolSize: 核心線程數(shù)
// maximumPoolSize: 最大線程數(shù)
// keepAliveTime: 非核心工作線程在阻塞隊(duì)列位置等待的時(shí)間
// unit: 非核心工作線程在阻塞隊(duì)列位置等待的時(shí)間單位
// workQueue: 存放任務(wù)的阻塞隊(duì)列
// threadFactory: 線程工廠(生產(chǎn)線程的地方)
// RejectedExecutionHandler: 拒絕策略
public ThreadPoolExecutor(int corePoolSize, 
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 核心線程數(shù)可以為0
    // 最大線程數(shù)不為0
    // 最大線程數(shù) 大于 核心線程數(shù)
    // 等待時(shí)間大于等于0
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    // 將當(dāng)前的入?yún)①x值給成員變量
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

我們上面初始化的過(guò)程主要對(duì)入?yún)⒆隽艘恍┬r?yàn),然后將方法的入?yún)①x予給成員變量

1.1 拒絕策略

1.1.1 AbortPolicy

簡(jiǎn)單粗暴,直接拋出異常

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
}

1.1.2 CallerRunsPolicy

當(dāng)前拒絕策略會(huì)在線程池?zé)o法處理任務(wù)時(shí),將任務(wù)交給調(diào)用者處理

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 如果當(dāng)前的
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

1.1.3 DiscardOldestPolicy

如果當(dāng)前的阻塞隊(duì)列滿了,彈出時(shí)間最久的

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            // 獲取阻塞隊(duì)列,彈出一個(gè)時(shí)間最久的
            e.getQueue().poll();
            // 執(zhí)行當(dāng)前的
            e.execute(r);
        }
    }
}

1.1.4 DiscardPolicy

簡(jiǎn)單粗暴,不做任何操作

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

1.1.5 自定義拒絕策略

自己寫(xiě)的業(yè)務(wù)邏輯,可以將拒絕的任務(wù)放至數(shù)據(jù)庫(kù)等存儲(chǔ),等后續(xù)在執(zhí)行
public static class MyRejectedExecution implements RejectedExecutionHandler{

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("這是我自己的拒絕策略");
    }
}

1.2 其余變量

// 該數(shù)值代表兩個(gè)意思:
// 高3位表示當(dāng)前線程池的狀態(tài)
// 低29位表示當(dāng)前線程池工作線程的個(gè)數(shù)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//  COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY就是當(dāng)前工作線程能記錄的工作線程的最大個(gè)數(shù)
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 111:代表RUNNING狀態(tài),RUNNING可以處理任務(wù),并且處理阻塞隊(duì)列中的任務(wù)。
private static final int RUNNING    = -1 << COUNT_BITS;
// 000:代表SHUTDOWN狀態(tài),不會(huì)接收新任務(wù),正在處理的任務(wù)正常進(jìn)行,阻塞隊(duì)列的任務(wù)也會(huì)做完。
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001:代表STOP狀態(tài),不會(huì)接收新任務(wù),正在處理任務(wù)的線程會(huì)被中斷,阻塞隊(duì)列的任務(wù)一個(gè)不管。
private static final int STOP       =  1 << COUNT_BITS;
// 010:代表TIDYING狀態(tài),這個(gè)狀態(tài)是否SHUTDOWN或者STOP轉(zhuǎn)換過(guò)來(lái)的,代表當(dāng)前線程池馬上關(guān)閉,就是過(guò)渡狀態(tài)。
private static final int TIDYING    =  2 << COUNT_BITS;
// 011:代表TERMINATED狀態(tài),這個(gè)狀態(tài)是TIDYING狀態(tài)轉(zhuǎn)換過(guò)來(lái)的,轉(zhuǎn)換過(guò)來(lái)只需要執(zhí)行一個(gè)terminated方法。
private static final int TERMINATED =  3 << COUNT_BITS;

// 基于&運(yùn)算的特點(diǎn),保證只會(huì)拿到ctl高三位的值
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 基于&運(yùn)算的特點(diǎn),保證只會(huì)拿到ctl低29位的值
private static int workerCountOf(int c)  { return c & CAPACITY; }

線程池的狀態(tài)變化流程圖:

【Java 】從源碼全面解析Java 線程池

2、線程池的execute方法

  • Step1:當(dāng)前的線程池個(gè)數(shù)低于核心線程數(shù),直接添加核心線程即可
  • Step2:當(dāng)前的線程池個(gè)數(shù)大于核心線程數(shù),將任務(wù)添加至阻塞隊(duì)列中
  • Step3:如果添加阻塞隊(duì)列失敗,則需要添加非核心線程數(shù)處理任務(wù)
  • Step4:如果添加非核心線程數(shù)失敗(滿了),執(zhí)行拒絕策略
public void execute(Runnable command) {
    // 如果當(dāng)前傳過(guò)來(lái)的任務(wù)是null,直接拋出異常即可
    if (command == null)
        throw new NullPointerException();
    // 獲取當(dāng)前的數(shù)據(jù)值
    int c = ctl.get();
    
//==========================線程池第一階段:?jiǎn)?dòng)核心線程數(shù)開(kāi)始==================================================
    // Step1:獲取ctl低29位的數(shù)值,與我們的核心線程數(shù)相比
    if (workerCountOf(c) < corePoolSize) {
        // Step2:添加一個(gè)核心線程
        if (addWorker(command, true)){
            return;
        }
        // 更新一下當(dāng)前值
        c = ctl.get();
    }
//==========================線程池第一階段:?jiǎn)?dòng)核心線程數(shù)結(jié)束==================================================
    
    // 如果走到下面會(huì)有兩種情況:
    // 1、核心線程數(shù)滿了,需要往阻塞隊(duì)列里面扔任務(wù)
    // 2、核心線程數(shù)滿了,阻塞隊(duì)列也滿了,執(zhí)行拒絕策略
    
//==========================線程池第二階段:任務(wù)放至阻塞隊(duì)列開(kāi)始==================================================
    // 判斷當(dāng)前的狀態(tài)是不是Running的狀態(tài)(RUNNING可以處理任務(wù),并且處理阻塞隊(duì)列中的任務(wù))
    // 如果是Running的狀態(tài),則可以將任務(wù)放至阻塞隊(duì)列中
    // 這里如果放阻塞隊(duì)列失敗了,證明阻塞隊(duì)列滿了
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次更新數(shù)值
        int recheck = ctl.get();
        // 再次校驗(yàn)當(dāng)前的線程池狀態(tài)是不是Running
        // 如果線程池狀態(tài)不是Running的話,需要?jiǎng)h除掉剛剛放的任務(wù)
        if (!isRunning(recheck) && remove(command)){
            // 執(zhí)行拒絕策略
            reject(command);
        } 
        // 如果到這里,說(shuō)明上面阻塞隊(duì)列中已經(jīng)有數(shù)據(jù)了
        // 如果線程池的個(gè)數(shù)為0的話,需要?jiǎng)?chuàng)建一個(gè)非核心工作線程去執(zhí)行該任務(wù)
        // 不能讓人家堵塞著
        else if (workerCountOf(recheck) == 0){
            addWorker(null, false);
        }
    }
//==========================線程池第二階段:任務(wù)放至阻塞隊(duì)列結(jié)束==================================================

    // 如果走到這里的邏輯,證明上面的邏輯沒(méi)走通,有以下兩種情況:
    // 1、線程池的狀態(tài)不是Running
    //    1.1 如果是這種情況,下面的添加非核心工作線程失敗執(zhí)行拒絕策略,但這個(gè)并不是這個(gè)邏輯的重點(diǎn)
    // 2、阻塞隊(duì)列添加任務(wù)失敗(阻塞隊(duì)列滿了)
    //    2.1 這種情況才是我們需要關(guān)心的
    //    2.2 阻塞隊(duì)列滿了,添加非核心工作線程
    //    2.3 若添加非核心工作線程失敗,證明已經(jīng)到達(dá)maximumPoolSize的限制,執(zhí)行拒絕策略
//==========================線程池第三階段:?jiǎn)?dòng)非核心線程數(shù)開(kāi)始==================================================
    // 添加一個(gè)非核心工作線程
    else if (!addWorker(command, false))
        // 工作隊(duì)列中添加任務(wù)失敗,執(zhí)行拒絕策略
        reject(command);
//==========================線程池第三階段:?jiǎn)?dòng)非核心線程數(shù)結(jié)束==================================================
}

流程圖如下:
【Java 】從源碼全面解析Java 線程池

3、線程池的addWorker方法

3.1 校驗(yàn)

  • 校驗(yàn)當(dāng)前線程池的狀態(tài)
  • 校驗(yàn)當(dāng)前線程池工作線程的個(gè)數(shù)(核心線程數(shù)、最大工作線程數(shù))
private boolean addWorker(Runnable firstTask, boolean core) {
    // 這里主要是為了結(jié)束整個(gè)循環(huán)
    retry:
    for (;;) {
        // 獲取當(dāng)前線程池的數(shù)值(ctl)
        int c = ctl.get();
        
        // runStateOf:基于&運(yùn)算的特點(diǎn),保證只會(huì)拿到ctl高三位的值
        int rs = runStateOf(c);

//==========================線程池狀態(tài)判斷=============================================================
        // rs >= SHUTDOWN:代表當(dāng)前線程池狀態(tài)為:SHUTDOWN、STOP、TIDYING、TERMINATED,線程池狀態(tài)異常
        // 但這里SHUTDOWN狀態(tài)稍許不同(不會(huì)接收新任務(wù),正在處理的任務(wù)正常進(jìn)行,阻塞隊(duì)列的任務(wù)也會(huì)做完)
        // 如果當(dāng)前的狀態(tài)是SHUTDOWN狀態(tài)并且阻塞隊(duì)列任務(wù)不為空且新任務(wù)為空
        // 需要新起一個(gè)非核心工作線程去執(zhí)行任務(wù)
        // 如果不是前面的,直接返回false即可
        if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())){
            return false;
        }

//==========================工作線程個(gè)數(shù)判斷==========================================================
        for (;;) {
            // 獲取當(dāng)前線程池中線程的個(gè)數(shù)
            int wc = workerCountOf(c);
            // 1、如果線程池線程的個(gè)數(shù)是否超過(guò)了工作線程的最大個(gè)數(shù)
            // 2、core=true(核心線程)=false(工作線程)
            // 2.1 根據(jù)當(dāng)前core判斷創(chuàng)建的是核心線程數(shù)(corePoolSize)還是非核心線程數(shù)(maximumPoolSize)
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){
                return false;
            }
            // 嘗試將線程池線程加一
            if (compareAndIncrementWorkerCount(c)){
                // CAS成功后,直接退出外層循環(huán),代表可以執(zhí)行添加工作線程操作了。
                break retry;
            }
            
            // 獲取當(dāng)前線程池的數(shù)值(ctl)
            c = ctl.get(); 
            // 獲取當(dāng)前線程池的狀態(tài)
            // 判斷當(dāng)前線程池的狀態(tài)等不等于我們上面的rs
            // 我們線程池的狀態(tài)被人更改了,需要重新跑整個(gè)for循環(huán)判斷邏輯
            if (runStateOf(c) != rs){
                continue retry;
            }
        }
    }
    // 省略下面的代碼
}

3.2 添加線程

{
    // 省略校驗(yàn)的步驟
    
    // 兩個(gè)標(biāo)記
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 將當(dāng)前的任務(wù)封裝成Worker
        w = new Worker(firstTask);
        // 拿到當(dāng)前Worker的線程
        final Thread t = w.thread;
        // 線程不為空
        if (t != null) {
            // 上鎖,保證線程安全(workers、largestPoolSize)
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                
                // 獲取線程池的狀態(tài)
                int rs = runStateOf(ctl.get());

                // 1、rs < SHUTDOWN:保證當(dāng)前線程池的狀態(tài)一定是RUNNING狀態(tài)
                // 2、rs == SHUTDOWN && firstTask == null:如果當(dāng)前線程池是SHUTDOWN狀態(tài)且新任務(wù)為空
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    // 判斷線程是否還活著
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // private final HashSet<Worker> workers = new HashSet<Worker>();
                    // 添加到我們的work隊(duì)列中
                    workers.add(w);
                    // 獲取works的大小
                    int s = workers.size();
                    // largestPoolSize在記錄最大線程個(gè)數(shù)的記錄
                    // 如果當(dāng)前的大小比最大的還要打,替換即可
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // worker添加成功
                    workerAdded = true;
                }
            } finally {
                // 解鎖
                mainLock.unlock();
            }
            // 如果添加成功
            if (workerAdded) {
                // 啟動(dòng)線程
                t.start();
                // 線程啟動(dòng)標(biāo)志位
                workerStarted = true;
            }
        }
    } finally {
        // 如果線程沒(méi)有啟動(dòng)成功,從workers集合中刪除掉該worker
        if (!workerStarted)
            addWorkerFailed(w);
    }
    // 返回線程是否啟動(dòng)成功
    return workerStarted;
}

// Worker的初始化
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 從線程工廠里面拿一個(gè)線程出來(lái)
    this.thread = getThreadFactory().newThread(this);
}

// 從workers集合中刪除掉該worker
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

4、線程池的 worker 源碼

// Worker繼承了AQS,目的就是為了控制工作線程的中斷。
// Worker實(shí)現(xiàn)了Runnable,內(nèi)部的Thread對(duì)象,在執(zhí)行start時(shí),必然要執(zhí)行Worker中斷額一些操作
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    
    private static final long serialVersionUID = 6138294804551838833L;
    // 當(dāng)前線程工廠創(chuàng)建的線程(也是執(zhí)行任務(wù)使用的線程)
    final Thread thread;
    
    // 當(dāng)前的第一個(gè)任務(wù)
    Runnable firstTask;
  
    // 記錄執(zhí)行了多少個(gè)任務(wù)
    volatile long completedTasks;

    // 構(gòu)造方法
    Worker(Runnable firstTask) {
        // 將State設(shè)置為-1,代表當(dāng)前不允許中斷線程
        setState(-1); 
        // 設(shè)置任務(wù)
        this.firstTask = firstTask;
        // 設(shè)置線程
        this.thread = getThreadFactory().newThread(this);
    }

    // 線程啟動(dòng)執(zhí)行的方法
    public void run() {
        runWorker(this);
    }
    
    // =======================Worker管理中斷================================   
    // 當(dāng)前方法是中斷工作線程時(shí),執(zhí)行的方法
    void interruptIfStarted() {
        Thread t;
        // 只有Worker中的state >= 0的時(shí)候,可以中斷工作線程
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                // 如果狀態(tài)正常,并且線程未中斷,這邊就中斷線程
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    } 
}

5、線程池的 runWorker 方法

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    // 拿到當(dāng)前的線程
    Thread wt = Thread.currentThread();
    // 拿到當(dāng)前Worker的第一個(gè)任務(wù)(如果攜帶的話)
    Runnable task = w.firstTask;
    // 置空
    w.firstTask = null;
    // 解鎖
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        // 如果任務(wù)不等于空 或者 從阻塞隊(duì)列中拿到的任務(wù)不等于空
        while (task != null || (task = getTask()) != null) {
            // 加鎖
            w.lock();
            // 如果線程池狀態(tài) >= STOP,確保線程中斷了
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 在任務(wù)執(zhí)行前做一些操作,自己實(shí)現(xiàn)的鉤子
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 任務(wù)執(zhí)行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 任務(wù)執(zhí)行后一些操作:自己實(shí)現(xiàn)的鉤子
                    afterExecute(task, thrown);
                }
            } finally {
                // 任務(wù)置空
                task = null;
                // 執(zhí)行任務(wù)+1
                w.completedTasks++;
                // 解鎖
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 刪除線程的方法
        processWorkerExit(w, completedAbruptly);
    }
}

6、線程池的 getTask 方法

private Runnable getTask() {
    // 超時(shí)的標(biāo)記
    boolean timedOut = false; 

    // 死循環(huán)拿數(shù)據(jù)
    for (;;) {
        // 拿到當(dāng)前的ctl
        int c = ctl.get();
        // 獲取其線程池狀態(tài)
        int rs = runStateOf(c);

        // 如果線程池狀態(tài)是STOP,沒(méi)有必要處理阻塞隊(duì)列任務(wù),直接返回null
        // 如果線程池狀態(tài)是SHUTDOWN,并且阻塞隊(duì)列是空的,直接返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 線程池中的線程個(gè)數(shù)減一
            decrementWorkerCount();
            return null;
        }

        // 當(dāng)前線程池中線程個(gè)數(shù)
        int wc = workerCountOf(c);

        // 這里是個(gè)重點(diǎn)
        // allowCoreThreadTimeOut:是否允許核心線程數(shù)超時(shí)(開(kāi)啟這個(gè)之后),核心線程數(shù)也會(huì)執(zhí)行下面超時(shí)的邏輯
        // wc > corePoolSize:當(dāng)前線程池中的線程個(gè)數(shù)大于核心線程數(shù)
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // wc > maximumPoolSize:基本不存在
        // timed && timedOut:第一次肯定是失敗的(超時(shí)標(biāo)記為false)
        if ((wc > maximumPoolSize || (timed && timedOut))
            // 1、線程個(gè)數(shù)為1
            // 2、阻塞隊(duì)列是空的
            && (wc > 1 || workQueue.isEmpty())) {
            // 線程池的線程個(gè)數(shù)減一
            if (compareAndDecrementWorkerCount(c)){
                return null;
            }
            continue;
        }

        try {
            // 根據(jù)我們前面的timed的值(當(dāng)前線程池中的線程個(gè)數(shù)是否大于核心線程數(shù))
            // 如果大于,執(zhí)行workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)帶有時(shí)間的等待,超過(guò)時(shí)間無(wú)任務(wù),會(huì)返回null
            // 如果小于,執(zhí)行workQueue.take(),死等任務(wù),不會(huì)返回null
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null){
                return r;
            }
            // 到這里,說(shuō)明上面的等待超時(shí)了
            // 這里要注意一下,如果這里超時(shí)后,我們上面 if ((wc > maximumPoolSize || (timed && timedOut)) 這個(gè)判斷要起作用了
        	// (timed && timedOut) true
            // wc > 1 || workQueue.isEmpty():當(dāng)線程大于1或者阻塞隊(duì)列無(wú)數(shù)據(jù),直接返回null,讓外部循環(huán)刪除
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

7、線程池的 processWorkerExit 方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 正常退出的:completedAbruptly=false
    // 不是正常退出的:completedAbruptly=true
    if (completedAbruptly) 
        decrementWorkerCount();

    // 加鎖——上鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 將當(dāng)前worker的執(zhí)行任務(wù)數(shù)累加到線程池中
        completedTaskCount += w.completedTasks;
        // 線程池刪除該工作線程
        workers.remove(w);
    } finally {
        // 解鎖
        mainLock.unlock();
    }

    tryTerminate();

    // 獲取ctl的數(shù)據(jù)
    int c = ctl.get();
    // 這里只有SHUTDOWN、RUNNING會(huì)進(jìn)入判斷
    if (runStateLessThan(c, STOP)) {
        // 正常退出的
        if (!completedAbruptly) {
            // 是否允許超時(shí)
            // 允許:0
            // 不允許:核心線程數(shù)
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果min=0并且阻塞隊(duì)列不為空
            // 將min設(shè)置成1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 當(dāng)前線程池的大小大于最小值,直接返回即可
            if (workerCountOf(c) >= min){
                return;
            }
        }
        // 如果沒(méi)有的話,說(shuō)明線程池中沒(méi)有線程了,并且還有阻塞任務(wù)
        // 只能添加一個(gè)非核心線程去處理這些任務(wù)
        addWorker(null, false);
    }
}

8、線程池的關(guān)閉方法

8.1 shutdownNow 方法

  • 將線程池狀態(tài)修改為Stop(不會(huì)接收新任務(wù),正在處理任務(wù)的線程會(huì)被中斷,阻塞隊(duì)列的任務(wù)一個(gè)不管)
  • 將線程池中的線程全部中斷
  • 刪除當(dāng)前線程池所有的工作線程
  • 將線程池的狀態(tài)從:Stop --> TIDYING --> TERMINATED,正式標(biāo)記線程池的結(jié)束(喚醒一下等待的主線程)
public List<Runnable> shutdownNow() {
    // 聲明返回結(jié)果
    List<Runnable> tasks;
    // 加鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 將線程池狀態(tài)修改為STOP
        advanceRunState(STOP);
        // 將線程池中的線程全部中斷
        interruptWorkers();
        // 刪除當(dāng)前所有的工作線程
        tasks = drainQueue();
    } finally {
        // 解鎖
        mainLock.unlock();
    }
    // 查看當(dāng)前線程池是否可以變?yōu)門ERMINATED狀態(tài)
    // 從 Stop 狀態(tài)修改為 TIDYING,在修改為 TERMINATED
    tryTerminate();
    return tasks;
}

// targetState = STOP
// 作用:將當(dāng)前線程池的狀態(tài)修改為Stop
private void advanceRunState(int targetState) {
    // 進(jìn)來(lái)直接死循環(huán)
    for (;;) {
        // 拿到當(dāng)前的ctl
        int c = ctl.get();
        // runStateAtLeast(c, targetState):當(dāng)前的c是不是大于STOP(如果大于Stop的話,說(shuō)明線程池狀態(tài)已經(jīng)G了
        // 基于CAS,將ctl從c修改為Stop狀態(tài),不修改工作線程個(gè)數(shù),僅僅將狀態(tài)修改為Stop
        // 如果可以修改成功,直接退出即可
        if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

// 將線程池中的線程全部中斷
private void interruptWorkers() {
    // 加鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 循環(huán)遍歷線程組
        for (Worker w : workers)
            // 中斷線程
           	// 這里會(huì)給線程打一個(gè)中斷的標(biāo)記,具體什么時(shí)候中斷線程,需要我們自己去控制
            w.interruptIfStarted();
    } finally {
        // 解鎖
        mainLock.unlock();
    }
}

// 刪除當(dāng)前所有的工作線程
private List<Runnable> drainQueue() {
    // 存放工作線程的隊(duì)列
    BlockingQueue<Runnable> q = workQueue;
    // 返回的結(jié)果
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    // 清空阻塞隊(duì)列并將數(shù)據(jù)放入taskList中
    q.drainTo(taskList);
    // 校驗(yàn)當(dāng)前的數(shù)據(jù)是夠真的清空
    if (!q.isEmpty()) {
        // 如果確實(shí)有遺漏的,畢竟這哥們也沒(méi)上鎖
        // 手動(dòng)的將線程從workQueue刪除掉并且放到taskList中
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    // 最終返回即可
    return taskList;
}

final void tryTerminate() {
    for (;;) {
        // 拿到ctl
        int c = ctl.get();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // CAS將當(dāng)前的ctl設(shè)置成TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 該方法是一個(gè)鉤子函數(shù),我們自己定義,在線程池銷毀之前做最后的處理
                    terminated();
                } finally {
                    // 將ctl設(shè)置成TERMINATED標(biāo)志著線程池的正式結(jié)束
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 線程池提供了一個(gè)方法,主線程在提交任務(wù)到線程池后,是可以繼續(xù)做其他操作的。
                    // 咱們也可以讓主線程提交任務(wù)后,等待線程池處理完畢,再做后續(xù)操作
                    // 這里線程池涼涼后,要喚醒哪些調(diào)用了awaitTermination方法的線程
                    // 簡(jiǎn)單來(lái)說(shuō),當(dāng)時(shí)等待線程池返回的主線程,由于線程池已經(jīng)銷毀了,他們也必須要喚醒
                    termination.signalAll();
                }
                return;
            }
        } finally {
            // 解鎖
            mainLock.unlock();
        }
    }
}

8.2 shutdown 方法

public void shutdown() {
    // 加鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 將線程池狀態(tài)修改為SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 將線程池中的線程全部中斷
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 查看當(dāng)前線程池是否可以變?yōu)門ERMINATED狀態(tài)
    // 從 SHUTDOWN 狀態(tài)修改為 TIDYING,在修改為 TERMINATED
    tryTerminate();
}

四、流程圖

【Java 】從源碼全面解析Java 線程池

五、總結(jié)

魯迅先生曾說(shuō):獨(dú)行難,眾行易,和志同道合的人一起進(jìn)步。彼此毫無(wú)保留的分享經(jīng)驗(yàn),才是對(duì)抗互聯(lián)網(wǎng)寒冬的最佳選擇。

其實(shí)很多時(shí)候,并不是我們不夠努力,很可能就是自己努力的方向不對(duì),如果有一個(gè)人能稍微指點(diǎn)你一下,你真的可能會(huì)少走幾年彎路。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-438624.html

到了這里,關(guān)于【Java 】從源碼全面解析Java 線程池的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包