- ??作者簡介:大家好,我是愛敲代碼的小黃,獨角獸企業(yè)的Java開發(fā)工程師,CSDN博客專家,阿里云專家博主
- ??系列專欄:Java設計模式、Spring源碼系列、Netty源碼系列、Kafka源碼系列、JUC源碼系列
- ??如果感覺博主的文章還不錯的話,請??三連支持??一下博主哦
- ??博主正在努力完成2023計劃中:以夢為馬,揚帆起航,2023追夢人
- ??聯(lián)系方式:hls1793929520,加我進群,大家一起學習,一起進步,一起對抗互聯(lián)網(wǎng)寒冬??
線程池源碼剖析
一、引言
線程池技術在互聯(lián)網(wǎng)技術使用如此廣泛,幾乎所有的后端技術面試官都要在線程池技術的使用和原理方面對小伙伴們進行 360° 的刁難。
作為一個在互聯(lián)網(wǎng)公司面一次拿一次 Offer 的面霸,打敗了無數(shù)競爭對手,每次都只能看到無數(shù)落寞的身影失望的離開,略感愧疚(請允許我使用一下夸張的修辭手法)。
于是在一個寂寞難耐的夜晚,暖男我痛定思痛,決定開始寫 《吊打面試官》 系列,希望能幫助各位讀者以后面試勢如破竹,對面試官進行 360° 的反擊,吊打問你的面試官,讓一同面試的同僚瞠目結舌,瘋狂收割大廠 Offer!
雖然現(xiàn)在是互聯(lián)網(wǎng)寒冬,但乾坤未定,你我皆是黑馬
二、使用
我想大部分人應該都使用過線程池,我們的 JDK
中也提供了一些包裝好的線程池使用,比如:
-
newFixedThreadPool:返回一個核心線程數(shù)為
nThreads
的線程池public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
-
newSingleThreadExecutor:返回一個核心線程數(shù)為
1
的線程池public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); }
-
newCachedThreadPool:大同小異
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
通過上面 JDK
提供的我們可以發(fā)現(xiàn)一個共識,他們其實都是調(diào)用了 ThreadPoolExecutor
的構造方法來進行線程池的創(chuàng)建
這時候,我們不免有疑問,我們難道不可以直接使用 ThreadPoolExecutor
的構造方法去進行創(chuàng)建嘛
是的,阿里巴巴Java開發(fā)手冊中明確指出,『不允許』使用Executors創(chuàng)建線程池
所以,我們在生產(chǎn)中,一般使用 ThreadPoolExecutor
的構造方法自定義去創(chuàng)建線程池,比如:
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心線程數(shù)
5, // 最大線程數(shù)
200, // 非核心工作線程在阻塞隊列位置等待的時間
TimeUnit.SECONDS, // 非核心工作線程在阻塞隊列位置等待的單位
new LinkedBlockingQueue<>(), // 阻塞隊列,存放任務的地方
new ThreadPoolExecutor.AbortPolicy() // 拒絕策略:這里有四種
);
for (int i = 0; i < 10; i++) {
MyTask task = new MyTask();
executor.execute(task);
}
// 關閉線程
executor.shutdown();
}
}
class MyTask implements Runnable {
@Override
public void run() {
System.out.println("我被執(zhí)行了....");
}
}
三、源碼
整體的流程如下:
1、初始化
聊源碼不從初始化聊的,都是不講道理的
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
// 執(zhí)行ThreadPoolExecutor的構造方法進行初始化
// corePoolSize: 核心線程數(shù)
// maximumPoolSize: 最大線程數(shù)
// keepAliveTime: 非核心工作線程在阻塞隊列位置等待的時間
// unit: 非核心工作線程在阻塞隊列位置等待的時間單位
// workQueue: 存放任務的阻塞隊列
// threadFactory: 線程工廠(生產(chǎn)線程的地方)
// RejectedExecutionHandler: 拒絕策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 核心線程數(shù)可以為0
// 最大線程數(shù)不為0
// 最大線程數(shù) 大于 核心線程數(shù)
// 等待時間大于等于0
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 將當前的入?yún)①x值給成員變量
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
我們上面初始化的過程主要對入?yún)⒆隽艘恍┬r灒缓髮⒎椒ǖ娜雲(yún)①x予給成員變量
1.1 拒絕策略
1.1.1 AbortPolicy
簡單粗暴,直接拋出異常
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
}
}
1.1.2 CallerRunsPolicy
當前拒絕策略會在線程池無法處理任務時,將任務交給調(diào)用者處理
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 如果當前的
if (!e.isShutdown()) {
r.run();
}
}
}
1.1.3 DiscardOldestPolicy
如果當前的阻塞隊列滿了,彈出時間最久的
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 獲取阻塞隊列,彈出一個時間最久的
e.getQueue().poll();
// 執(zhí)行當前的
e.execute(r);
}
}
}
1.1.4 DiscardPolicy
簡單粗暴,不做任何操作
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
1.1.5 自定義拒絕策略
自己寫的業(yè)務邏輯,可以將拒絕的任務放至數(shù)據(jù)庫等存儲,等后續(xù)在執(zhí)行
public static class MyRejectedExecution implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("這是我自己的拒絕策略");
}
}
1.2 其余變量
// 該數(shù)值代表兩個意思:
// 高3位表示當前線程池的狀態(tài)
// 低29位表示當前線程池工作線程的個數(shù)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY就是當前工作線程能記錄的工作線程的最大個數(shù)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 111:代表RUNNING狀態(tài),RUNNING可以處理任務,并且處理阻塞隊列中的任務。
private static final int RUNNING = -1 << COUNT_BITS;
// 000:代表SHUTDOWN狀態(tài),不會接收新任務,正在處理的任務正常進行,阻塞隊列的任務也會做完。
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001:代表STOP狀態(tài),不會接收新任務,正在處理任務的線程會被中斷,阻塞隊列的任務一個不管。
private static final int STOP = 1 << COUNT_BITS;
// 010:代表TIDYING狀態(tài),這個狀態(tài)是否SHUTDOWN或者STOP轉換過來的,代表當前線程池馬上關閉,就是過渡狀態(tài)。
private static final int TIDYING = 2 << COUNT_BITS;
// 011:代表TERMINATED狀態(tài),這個狀態(tài)是TIDYING狀態(tài)轉換過來的,轉換過來只需要執(zhí)行一個terminated方法。
private static final int TERMINATED = 3 << COUNT_BITS;
// 基于&運算的特點,保證只會拿到ctl高三位的值
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 基于&運算的特點,保證只會拿到ctl低29位的值
private static int workerCountOf(int c) { return c & CAPACITY; }
線程池的狀態(tài)變化流程圖:
2、線程池的execute方法
- Step1:當前的線程池個數(shù)低于核心線程數(shù),直接添加核心線程即可
- Step2:當前的線程池個數(shù)大于核心線程數(shù),將任務添加至阻塞隊列中
- Step3:如果添加阻塞隊列失敗,則需要添加非核心線程數(shù)處理任務
- Step4:如果添加非核心線程數(shù)失敗(滿了),執(zhí)行拒絕策略
public void execute(Runnable command) {
// 如果當前傳過來的任務是null,直接拋出異常即可
if (command == null)
throw new NullPointerException();
// 獲取當前的數(shù)據(jù)值
int c = ctl.get();
//==========================線程池第一階段:啟動核心線程數(shù)開始==================================================
// Step1:獲取ctl低29位的數(shù)值,與我們的核心線程數(shù)相比
if (workerCountOf(c) < corePoolSize) {
// Step2:添加一個核心線程
if (addWorker(command, true)){
return;
}
// 更新一下當前值
c = ctl.get();
}
//==========================線程池第一階段:啟動核心線程數(shù)結束==================================================
// 如果走到下面會有兩種情況:
// 1、核心線程數(shù)滿了,需要往阻塞隊列里面扔任務
// 2、核心線程數(shù)滿了,阻塞隊列也滿了,執(zhí)行拒絕策略
//==========================線程池第二階段:任務放至阻塞隊列開始==================================================
// 判斷當前的狀態(tài)是不是Running的狀態(tài)(RUNNING可以處理任務,并且處理阻塞隊列中的任務)
// 如果是Running的狀態(tài),則可以將任務放至阻塞隊列中
// 這里如果放阻塞隊列失敗了,證明阻塞隊列滿了
if (isRunning(c) && workQueue.offer(command)) {
// 再次更新數(shù)值
int recheck = ctl.get();
// 再次校驗當前的線程池狀態(tài)是不是Running
// 如果線程池狀態(tài)不是Running的話,需要刪除掉剛剛放的任務
if (!isRunning(recheck) && remove(command)){
// 執(zhí)行拒絕策略
reject(command);
}
// 如果到這里,說明上面阻塞隊列中已經(jīng)有數(shù)據(jù)了
// 如果線程池的個數(shù)為0的話,需要創(chuàng)建一個非核心工作線程去執(zhí)行該任務
// 不能讓人家堵塞著
else if (workerCountOf(recheck) == 0){
addWorker(null, false);
}
}
//==========================線程池第二階段:任務放至阻塞隊列結束==================================================
// 如果走到這里的邏輯,證明上面的邏輯沒走通,有以下兩種情況:
// 1、線程池的狀態(tài)不是Running
// 1.1 如果是這種情況,下面的添加非核心工作線程失敗執(zhí)行拒絕策略,但這個并不是這個邏輯的重點
// 2、阻塞隊列添加任務失敗(阻塞隊列滿了)
// 2.1 這種情況才是我們需要關心的
// 2.2 阻塞隊列滿了,添加非核心工作線程
// 2.3 若添加非核心工作線程失敗,證明已經(jīng)到達maximumPoolSize的限制,執(zhí)行拒絕策略
//==========================線程池第三階段:啟動非核心線程數(shù)開始==================================================
// 添加一個非核心工作線程
else if (!addWorker(command, false))
// 工作隊列中添加任務失敗,執(zhí)行拒絕策略
reject(command);
//==========================線程池第三階段:啟動非核心線程數(shù)結束==================================================
}
流程圖如下:
3、線程池的addWorker方法
3.1 校驗
- 校驗當前線程池的狀態(tài)
- 校驗當前線程池工作線程的個數(shù)(核心線程數(shù)、最大工作線程數(shù))
private boolean addWorker(Runnable firstTask, boolean core) {
// 這里主要是為了結束整個循環(huán)
retry:
for (;;) {
// 獲取當前線程池的數(shù)值(ctl)
int c = ctl.get();
// runStateOf:基于&運算的特點,保證只會拿到ctl高三位的值
int rs = runStateOf(c);
//==========================線程池狀態(tài)判斷=============================================================
// rs >= SHUTDOWN:代表當前線程池狀態(tài)為:SHUTDOWN、STOP、TIDYING、TERMINATED,線程池狀態(tài)異常
// 但這里SHUTDOWN狀態(tài)稍許不同(不會接收新任務,正在處理的任務正常進行,阻塞隊列的任務也會做完)
// 如果當前的狀態(tài)是SHUTDOWN狀態(tài)并且阻塞隊列任務不為空且新任務為空
// 需要新起一個非核心工作線程去執(zhí)行任務
// 如果不是前面的,直接返回false即可
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())){
return false;
}
//==========================工作線程個數(shù)判斷==========================================================
for (;;) {
// 獲取當前線程池中線程的個數(shù)
int wc = workerCountOf(c);
// 1、如果線程池線程的個數(shù)是否超過了工作線程的最大個數(shù)
// 2、core=true(核心線程)=false(工作線程)
// 2.1 根據(jù)當前core判斷創(chuàng)建的是核心線程數(shù)(corePoolSize)還是非核心線程數(shù)(maximumPoolSize)
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){
return false;
}
// 嘗試將線程池線程加一
if (compareAndIncrementWorkerCount(c)){
// CAS成功后,直接退出外層循環(huán),代表可以執(zhí)行添加工作線程操作了。
break retry;
}
// 獲取當前線程池的數(shù)值(ctl)
c = ctl.get();
// 獲取當前線程池的狀態(tài)
// 判斷當前線程池的狀態(tài)等不等于我們上面的rs
// 我們線程池的狀態(tài)被人更改了,需要重新跑整個for循環(huán)判斷邏輯
if (runStateOf(c) != rs){
continue retry;
}
}
}
// 省略下面的代碼
}
3.2 添加線程
{
// 省略校驗的步驟
// 兩個標記
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 將當前的任務封裝成Worker
w = new Worker(firstTask);
// 拿到當前Worker的線程
final Thread t = w.thread;
// 線程不為空
if (t != null) {
// 上鎖,保證線程安全(workers、largestPoolSize)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 獲取線程池的狀態(tài)
int rs = runStateOf(ctl.get());
// 1、rs < SHUTDOWN:保證當前線程池的狀態(tài)一定是RUNNING狀態(tài)
// 2、rs == SHUTDOWN && firstTask == null:如果當前線程池是SHUTDOWN狀態(tài)且新任務為空
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
// 判斷線程是否還活著
if (t.isAlive())
throw new IllegalThreadStateException();
// private final HashSet<Worker> workers = new HashSet<Worker>();
// 添加到我們的work隊列中
workers.add(w);
// 獲取works的大小
int s = workers.size();
// largestPoolSize在記錄最大線程個數(shù)的記錄
// 如果當前的大小比最大的還要打,替換即可
if (s > largestPoolSize)
largestPoolSize = s;
// worker添加成功
workerAdded = true;
}
} finally {
// 解鎖
mainLock.unlock();
}
// 如果添加成功
if (workerAdded) {
// 啟動線程
t.start();
// 線程啟動標志位
workerStarted = true;
}
}
} finally {
// 如果線程沒有啟動成功,從workers集合中刪除掉該worker
if (!workerStarted)
addWorkerFailed(w);
}
// 返回線程是否啟動成功
return workerStarted;
}
// Worker的初始化
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 從線程工廠里面拿一個線程出來
this.thread = getThreadFactory().newThread(this);
}
// 從workers集合中刪除掉該worker
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
4、線程池的 worker 源碼
// Worker繼承了AQS,目的就是為了控制工作線程的中斷。
// Worker實現(xiàn)了Runnable,內(nèi)部的Thread對象,在執(zhí)行start時,必然要執(zhí)行Worker中斷額一些操作
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
private static final long serialVersionUID = 6138294804551838833L;
// 當前線程工廠創(chuàng)建的線程(也是執(zhí)行任務使用的線程)
final Thread thread;
// 當前的第一個任務
Runnable firstTask;
// 記錄執(zhí)行了多少個任務
volatile long completedTasks;
// 構造方法
Worker(Runnable firstTask) {
// 將State設置為-1,代表當前不允許中斷線程
setState(-1);
// 設置任務
this.firstTask = firstTask;
// 設置線程
this.thread = getThreadFactory().newThread(this);
}
// 線程啟動執(zhí)行的方法
public void run() {
runWorker(this);
}
// =======================Worker管理中斷================================
// 當前方法是中斷工作線程時,執(zhí)行的方法
void interruptIfStarted() {
Thread t;
// 只有Worker中的state >= 0的時候,可以中斷工作線程
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
// 如果狀態(tài)正常,并且線程未中斷,這邊就中斷線程
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
5、線程池的 runWorker 方法
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
// 拿到當前的線程
Thread wt = Thread.currentThread();
// 拿到當前Worker的第一個任務(如果攜帶的話)
Runnable task = w.firstTask;
// 置空
w.firstTask = null;
// 解鎖
w.unlock();
boolean completedAbruptly = true;
try {
// 如果任務不等于空 或者 從阻塞隊列中拿到的任務不等于空
while (task != null || (task = getTask()) != null) {
// 加鎖
w.lock();
// 如果線程池狀態(tài) >= STOP,確保線程中斷了
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 在任務執(zhí)行前做一些操作,自己實現(xiàn)的鉤子
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 任務執(zhí)行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任務執(zhí)行后一些操作:自己實現(xiàn)的鉤子
afterExecute(task, thrown);
}
} finally {
// 任務置空
task = null;
// 執(zhí)行任務+1
w.completedTasks++;
// 解鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 刪除線程的方法
processWorkerExit(w, completedAbruptly);
}
}
6、線程池的 getTask 方法
private Runnable getTask() {
// 超時的標記
boolean timedOut = false;
// 死循環(huán)拿數(shù)據(jù)
for (;;) {
// 拿到當前的ctl
int c = ctl.get();
// 獲取其線程池狀態(tài)
int rs = runStateOf(c);
// 如果線程池狀態(tài)是STOP,沒有必要處理阻塞隊列任務,直接返回null
// 如果線程池狀態(tài)是SHUTDOWN,并且阻塞隊列是空的,直接返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 線程池中的線程個數(shù)減一
decrementWorkerCount();
return null;
}
// 當前線程池中線程個數(shù)
int wc = workerCountOf(c);
// 這里是個重點
// allowCoreThreadTimeOut:是否允許核心線程數(shù)超時(開啟這個之后),核心線程數(shù)也會執(zhí)行下面超時的邏輯
// wc > corePoolSize:當前線程池中的線程個數(shù)大于核心線程數(shù)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// wc > maximumPoolSize:基本不存在
// timed && timedOut:第一次肯定是失敗的(超時標記為false)
if ((wc > maximumPoolSize || (timed && timedOut))
// 1、線程個數(shù)為1
// 2、阻塞隊列是空的
&& (wc > 1 || workQueue.isEmpty())) {
// 線程池的線程個數(shù)減一
if (compareAndDecrementWorkerCount(c)){
return null;
}
continue;
}
try {
// 根據(jù)我們前面的timed的值(當前線程池中的線程個數(shù)是否大于核心線程數(shù))
// 如果大于,執(zhí)行workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)帶有時間的等待,超過時間無任務,會返回null
// 如果小于,執(zhí)行workQueue.take(),死等任務,不會返回null
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null){
return r;
}
// 到這里,說明上面的等待超時了
// 這里要注意一下,如果這里超時后,我們上面 if ((wc > maximumPoolSize || (timed && timedOut)) 這個判斷要起作用了
// (timed && timedOut) true
// wc > 1 || workQueue.isEmpty():當線程大于1或者阻塞隊列無數(shù)據(jù),直接返回null,讓外部循環(huán)刪除
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
7、線程池的 processWorkerExit 方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 正常退出的:completedAbruptly=false
// 不是正常退出的:completedAbruptly=true
if (completedAbruptly)
decrementWorkerCount();
// 加鎖——上鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 將當前worker的執(zhí)行任務數(shù)累加到線程池中
completedTaskCount += w.completedTasks;
// 線程池刪除該工作線程
workers.remove(w);
} finally {
// 解鎖
mainLock.unlock();
}
tryTerminate();
// 獲取ctl的數(shù)據(jù)
int c = ctl.get();
// 這里只有SHUTDOWN、RUNNING會進入判斷
if (runStateLessThan(c, STOP)) {
// 正常退出的
if (!completedAbruptly) {
// 是否允許超時
// 允許:0
// 不允許:核心線程數(shù)
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min=0并且阻塞隊列不為空
// 將min設置成1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 當前線程池的大小大于最小值,直接返回即可
if (workerCountOf(c) >= min){
return;
}
}
// 如果沒有的話,說明線程池中沒有線程了,并且還有阻塞任務
// 只能添加一個非核心線程去處理這些任務
addWorker(null, false);
}
}
8、線程池的關閉方法
8.1 shutdownNow 方法
- 將線程池狀態(tài)修改為Stop(不會接收新任務,正在處理任務的線程會被中斷,阻塞隊列的任務一個不管)
- 將線程池中的線程全部中斷
- 刪除當前線程池所有的工作線程
- 將線程池的狀態(tài)從:Stop --> TIDYING --> TERMINATED,正式標記線程池的結束(喚醒一下等待的主線程)
public List<Runnable> shutdownNow() {
// 聲明返回結果
List<Runnable> tasks;
// 加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 將線程池狀態(tài)修改為STOP
advanceRunState(STOP);
// 將線程池中的線程全部中斷
interruptWorkers();
// 刪除當前所有的工作線程
tasks = drainQueue();
} finally {
// 解鎖
mainLock.unlock();
}
// 查看當前線程池是否可以變?yōu)門ERMINATED狀態(tài)
// 從 Stop 狀態(tài)修改為 TIDYING,在修改為 TERMINATED
tryTerminate();
return tasks;
}
// targetState = STOP
// 作用:將當前線程池的狀態(tài)修改為Stop
private void advanceRunState(int targetState) {
// 進來直接死循環(huán)
for (;;) {
// 拿到當前的ctl
int c = ctl.get();
// runStateAtLeast(c, targetState):當前的c是不是大于STOP(如果大于Stop的話,說明線程池狀態(tài)已經(jīng)G了
// 基于CAS,將ctl從c修改為Stop狀態(tài),不修改工作線程個數(shù),僅僅將狀態(tài)修改為Stop
// 如果可以修改成功,直接退出即可
if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
// 將線程池中的線程全部中斷
private void interruptWorkers() {
// 加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 循環(huán)遍歷線程組
for (Worker w : workers)
// 中斷線程
// 這里會給線程打一個中斷的標記,具體什么時候中斷線程,需要我們自己去控制
w.interruptIfStarted();
} finally {
// 解鎖
mainLock.unlock();
}
}
// 刪除當前所有的工作線程
private List<Runnable> drainQueue() {
// 存放工作線程的隊列
BlockingQueue<Runnable> q = workQueue;
// 返回的結果
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
// 清空阻塞隊列并將數(shù)據(jù)放入taskList中
q.drainTo(taskList);
// 校驗當前的數(shù)據(jù)是夠真的清空
if (!q.isEmpty()) {
// 如果確實有遺漏的,畢竟這哥們也沒上鎖
// 手動的將線程從workQueue刪除掉并且放到taskList中
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
// 最終返回即可
return taskList;
}
final void tryTerminate() {
for (;;) {
// 拿到ctl
int c = ctl.get();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// CAS將當前的ctl設置成TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 該方法是一個鉤子函數(shù),我們自己定義,在線程池銷毀之前做最后的處理
terminated();
} finally {
// 將ctl設置成TERMINATED標志著線程池的正式結束
ctl.set(ctlOf(TERMINATED, 0));
// 線程池提供了一個方法,主線程在提交任務到線程池后,是可以繼續(xù)做其他操作的。
// 咱們也可以讓主線程提交任務后,等待線程池處理完畢,再做后續(xù)操作
// 這里線程池涼涼后,要喚醒哪些調(diào)用了awaitTermination方法的線程
// 簡單來說,當時等待線程池返回的主線程,由于線程池已經(jīng)銷毀了,他們也必須要喚醒
termination.signalAll();
}
return;
}
} finally {
// 解鎖
mainLock.unlock();
}
}
}
8.2 shutdown 方法
public void shutdown() {
// 加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 將線程池狀態(tài)修改為SHUTDOWN
advanceRunState(SHUTDOWN);
// 將線程池中的線程全部中斷
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 查看當前線程池是否可以變?yōu)門ERMINATED狀態(tài)
// 從 SHUTDOWN 狀態(tài)修改為 TIDYING,在修改為 TERMINATED
tryTerminate();
}
四、流程圖
五、總結
魯迅先生曾說:獨行難,眾行易,和志同道合的人一起進步。彼此毫無保留的分享經(jīng)驗,才是對抗互聯(lián)網(wǎng)寒冬的最佳選擇。
其實很多時候,并不是我們不夠努力,很可能就是自己努力的方向不對,如果有一個人能稍微指點你一下,你真的可能會少走幾年彎路。
如果你也對 后端架構和中間件源碼 有興趣,歡迎添加博主微信:hls1793929520,一起學習,一起成長
我是愛敲代碼的小黃,獨角獸企業(yè)的Java開發(fā)工程師,CSDN博客專家,喜歡后端架構和中間件源碼。
我們下期再見。
我從清晨走過,也擁抱夜晚的星辰,人生沒有捷徑,你我皆平凡,你好,陌生人,一起共勉。文章來源:http://www.zghlxwxcb.cn/news/detail-436397.html
往期文章推薦:文章來源地址http://www.zghlxwxcb.cn/news/detail-436397.html
- 從源碼全面解析LinkedBlockingQueue的來龍去脈
- 從源碼全面解析 ArrayBlockingQueue 的來龍去脈
- 從源碼全面解析ReentrantLock的來龍去脈
- 閱讀完synchronized和ReentrantLock的源碼后,我竟發(fā)現(xiàn)其完全相似
- 從源碼全面解析 ThreadLocal 關鍵字的來龍去脈
- 從源碼全面解析 synchronized 關鍵字的來龍去脈
- 阿里面試官讓我講講volatile,我直接從HotSpot開始講起,一套組合拳拿下面試
到了這里,關于從源碼全面解析Java 線程池的來龍去脈的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!