什么是線程池?
????????線程池主要是為了解決執(zhí)行新任務執(zhí)行時,應用程序為減少為任務創(chuàng)建一個新線程和任務執(zhí)行完畢時銷毀線程所帶來的開銷。通過線程池,可以在項目初始化時就創(chuàng)建一個線程集合,然后在需要執(zhí)行新任務時重用這些線程而不是每次都新建一個線程,一旦任務已經(jīng)完成了,線程回到線程池中并等待下一次分配任務,達到資源復用的效果。
線程池主要優(yōu)勢?
- 降低資源消耗:通過池化技術重復利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的損耗。
- 提高響應速度:任務到達時,無需等待線程創(chuàng)建即可立即執(zhí)行。
- 提高線程的可管理性:線程是稀缺資源,如果無限制創(chuàng)建,不僅會消耗系統(tǒng)資源,還會因為線程的不合理分布導致資源調度失衡,降低系統(tǒng)的穩(wěn)定性。使用線程池可以進行統(tǒng)一的分配、調優(yōu)和監(jiān)控。
- 提供更多更強大的功能:線程池具備可拓展性,允許開發(fā)人員向其中增加更多的功能。比如延時定時線程池ScheduledThreadPoolExecutor,就允許任務延期執(zhí)行或定期執(zhí)行。
如何創(chuàng)建線程?
通過Executors 創(chuàng)建線程池 |
newSingleThreadExecutor:創(chuàng)建一個只有一個線程的線程池,串行執(zhí)行所有任務,即使空閑時也不會被關閉??梢员WC所有任務的執(zhí)行順序按照任務的提交順序執(zhí)行。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。 適用場景:需要保證順序地執(zhí)行各個任務;并且在任意時間點,不會有多個線程活動的應用場景。 newFixedThreadPool:創(chuàng)建一個固定線程數(shù)量的線程池(corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作為阻塞隊列)。初始化時線程數(shù)量為零,之后每次提交一個任務就創(chuàng)建一個線程,直到線程達到線程池的最大容量。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結束,那么線程池會補充一個新線程。 適用場景:為了滿足資源管理的需求,而需要限制當前線程數(shù)量的應用場景,它適用于負載比較重的服務器。 newCachedThreadPool:創(chuàng)建一個可緩存的線程池,線程的最大數(shù)量為Integer.MAX_VALUE??臻e線程會臨時緩存下來,線程會等待60s還是沒有任務加入的話就會被關閉。 適用場景:適用于執(zhí)行很多的短時間異步任務的小程序,或者是負載較輕的服務器。 newScheduledThreadPool:創(chuàng)建一個支持執(zhí)行延遲任務或者周期性執(zhí)行任務的線程池。 |
ThreadPoolExecutor | 阿里巴巴開發(fā)手冊并發(fā)編程有一條規(guī)定:線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這是為什么呢?主要是因為這樣的可以避免資源耗盡的風險,因為使用Executors返回線程池對象的弊端有: FixedThreadPool 和 SingleThreadPool 允許的阻塞隊列長度為 Integer.MAX_VALUE,這樣會導致堆積大量的請求,從而導致OOM; CachedThreadPool 允許創(chuàng)建的線程數(shù)量為 Integer.MAX_VALUE,可能會創(chuàng)建大量的線程,從而導致 OOM。 所以創(chuàng)建線程池,最好是根據(jù)線程池的用途,然后自己創(chuàng)建線程池。 |
目錄
一、線程池系列相關文章
二、繼承實現(xiàn)關系圖
三、低層數(shù)據(jù)存儲結構
3.1 核心屬性
3.1.1 說明
3.1.2 線程池五種狀態(tài)
3.2 構造器
四、工作原理
五、源碼解析
5.1?核心方法execute()
5.2?添加任務addWork()
5.3?執(zhí)行任務runWork()
5.4?阻塞隊列取任務getTask()
5.5 Worker無任務最后處理processWorkerExit()
5.6 關閉線程池
5.6.1?關閉線程池shutdown()
5.6.2?關閉線程池shutdownNow()
5.6.3?嘗試結束線程池tryTerminate()
5.6.4?判斷線程池是否關閉awaitTermination()
一、線程池系列相關文章
? ? ? ? 1.1?Java線程池ThreadPoolExcutor01-參數(shù)說明
? ? ? ? 1.2?Java線程池ThreadPoolExcutor02-阻塞隊列之ArrayBlockingQueue
? ? ? ? 1.3?Java線程池ThreadPoolExcutor03-阻塞隊列之LinkedBlockingQueue
? ? ? ? 1.4?Java線程池ThreadPoolExcutor04-阻塞隊列之PriorityBlockingQueue原理及擴容機制詳解
? ? ? ? 1.5?Java線程池ThreadPoolExcutor05-阻塞隊列之DelayQueue原理及擴容機制詳解
? ? ? ? 1.6?Java線程池ThreadPoolExcutor06-阻塞隊列之SynchronousQueue
? ? ? ? 1.7?Java線程池ThreadPoolExcutor07-阻塞隊列之LinkedTransferQueue
? ? ? ? 1.8?Java線程池ThreadPoolExcutor07-阻塞隊列之LinkedBlockingDeque
? ? ? ? 1.9?Java線程池ThreadPoolExcutor08-4種拒絕策略
二、繼承實現(xiàn)關系圖
三、低層數(shù)據(jù)存儲結構
3.1 核心屬性
public class ThreadPoolExecutor extends AbstractExecutorService {
...
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<>();
private final Condition termination = mainLock.newCondition();
private int largestPoolSize;
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
...
}
3.1.1 說明
屬性名 | 說明 |
COUNT_BITS | 用于計算線程池的狀態(tài)值、容量 |
workQueue | 阻塞隊列。七個: ArrayBlockingQueue:基于數(shù)組結構的有界阻塞隊列,按FIFO排序任務。詳見1.2鏈接。 LinkedBlockingQueue:?基于鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高于ArrayBlockingQuene。詳見1.3鏈接。 PriorityBlockingQueue:具有優(yōu)先級的無界阻塞隊列。詳見1.4鏈接。 DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列,只有在延遲期滿時才能從中提取元素。詳見1.5鏈接。 SynchronousQueue:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態(tài)。詳見1.6鏈接。 LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。與SynchronousQueue類似,還含有非阻塞方法。詳見1.7鏈接。 LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。詳見1.8鏈接。 |
threadFactory |
線程工廠 |
handler | 拒絕策略處理類。四種:AbortPolicy策略(默認)、DiscardPolicy策略、DiscardOldestPolicy策略 和 CallerRunsPolicy策略。詳見1.9鏈接。 |
keepAliveTime |
為多余的空閑線程等待新任務的最長時間, 超過這個時間后多余的線程將被終止。這里把keepAliveTime設置為0L,意味著多余 的空閑線程會被立即終止。 |
allowCoreThreadTimeOut | 如果為true,核心線程使用keepAliveTime超時等待工作。 如果為false(默認),核心線程即使在空閑時也保持活動。 |
corePoolSize | 核心線程數(shù) |
maximumPoolSize | 最大線程數(shù) |
3.1.2 線程池五種狀態(tài)
屬性ctl 是ThreadPoolExecutor內部一個用來進行技術和狀態(tài)控制的控制變量,它使用了一個原子整形字段來實現(xiàn)兩個方面的管理:
- 低29位記錄線程池的線程數(shù),
- 高3位記錄線程池的工作狀態(tài)
五種狀態(tài):
- RUNNING:線程池一旦被創(chuàng)建處于RUNNING狀態(tài)。線程池處于RUNNING狀態(tài)時,能夠接收新任務以及對已添加的任務進行處理。
- SHUTDOWN:線程池處于SHUTDOWN狀態(tài)時,不接收新任務,但能處理已添加的任務。調用線程池的shutdown()接口時,線程池由RUNNING狀態(tài)轉變?yōu)镾HUTDOWN狀態(tài)。
- STOP:?線程池處于STOP狀態(tài)時,不接收新任務,不處理已添加的任務,并且會中斷正在處理的任務。調用線程池的shutdownNow()接口時,線程池由RUNNING狀態(tài)或者SHUTDOWN狀態(tài)變?yōu)镾TOP狀態(tài)。
- TIDYING: 當所有的任務已終止,ctl記錄的任務數(shù)為0,線程池的狀態(tài)會變?yōu)門IDYING狀態(tài)。當線程池狀態(tài)為SHUTDOWN時,阻塞隊列為空并且線程池中執(zhí)行的任務也為空時,就會由SHUTDOWN狀態(tài)變?yōu)?TIDYING狀態(tài);當線程池為STOP時,線程池中執(zhí)行的任務為空時,就會又STOP狀態(tài)變?yōu)?TIDYING狀態(tài)。
- TERMINATED:?線程池徹底終止,就會變成TERMINATED狀態(tài)。線程池處于TIDYING狀態(tài)時,調用terminated()就會由TIDYING狀態(tài)變?yōu)門ERMINATED狀態(tài)。
狀態(tài)轉換如下圖:
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 圖3.1.2-1
3.2 構造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
參數(shù)名 | 說明 |
corePoolSize | 核心線程數(shù) |
maximumPoolSize | 最大線程數(shù) |
keepAliveTime | 為多余的空閑線程等待新任務的最長時間, 超過這個時間后多余的線程將被終止。 |
unit | keepAliveTime的單位 |
workQueue | 阻塞隊列 |
threadFactory | 線程工廠 |
handler | 拒絕策略處理類 |
詳見1.1鏈接。
四、工作原理
- 如果任務null直接退出,否則執(zhí)行步驟2;
- 若工作線程數(shù)小于核心線程數(shù),執(zhí)行步驟3創(chuàng)建工作線程并執(zhí)行任務,否則執(zhí)行步驟4。
- 若阻塞隊列已滿
- 工作線程數(shù)小于最大線程數(shù),執(zhí)行步驟6創(chuàng)建工作線程并執(zhí)行任務,否則執(zhí)行步驟10?拒絕任務(執(zhí)行拒絕策略)
- 若阻塞隊列未滿,添加任務到阻塞隊列,若線程狀態(tài)不為運行中,則任務從隊列中取出,并執(zhí)行步驟10 拒絕任務(執(zhí)行拒絕策略)
五、源碼解析
5.1?核心方法execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//通過位運算,獲取當前線程數(shù),如果線程數(shù)小于corePoolSize,則執(zhí)行addWorker(),創(chuàng)建新線程執(zhí)行任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 若添加工作任務執(zhí)行任務不成功,嘗試添加到阻塞隊列中
c = ctl.get();
}
// 線程數(shù)超過核心線程數(shù),任務添加到阻塞隊列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 添加到阻塞隊列,但若線程池狀態(tài)不是運行中,則從隊列中取出
if (! isRunning(recheck) && remove(command))
// 并且執(zhí)行拒絕策略
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 阻塞隊列已滿,則若工作線程數(shù)小于最大線程數(shù) 創(chuàng)建線程執(zhí)行任務
else if (!addWorker(command, false))
// 若線程數(shù)已達最大線程數(shù),否則執(zhí)行拒絕策略
reject(command);
}
從源碼中可以清晰看出線程池執(zhí)行任務的邏輯與“四、工作原理”所述一致。
5.2?添加任務addWork()
THreadPoolExecutor內部類Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
//當前Worker所處于的線程
final Thread thread;
//待執(zhí)行的任務
Runnable firstTask;
//任務計數(shù)器
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}
它是對Runnable進行了封裝,主要功能是對待執(zhí)行的任務進行中斷處理和狀態(tài)監(jiān)控。Worker還繼承了AQS,在每個任務執(zhí)行時進行了加鎖的處理??梢詫orker簡單理解為可中斷的、可進行鎖處理的Runnable。
創(chuàng)建工作線程并執(zhí)行是addWork()方法,源碼如下:
private boolean addWorker(Runnable firstTask, boolean core) {
// 自旋,判斷線程池狀態(tài),并對線程數(shù)量執(zhí)行原子+1操作
retry:
for (;;) {
int c = ctl.get();
// 獲取線程池狀態(tài)
int rs = runStateOf(c);
// 如果線程池已經(jīng)關閉,則直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 判斷線程數(shù)是否已達上限,根據(jù)傳入?yún)?shù)core的不同,判斷corePoolSize或者maximumPoolSize。
// 如果線程數(shù)已達上限,直接返回false
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 執(zhí)行原子操作,對線程數(shù)+1
if (compareAndIncrementWorkerCount(c))
// 執(zhí)行原子操作成功,則退出自旋 并 創(chuàng)建工作線程執(zhí)行任務
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
// ctl變量操作成功,執(zhí)行Worker相關邏輯
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建一個新的Worker,傳入待執(zhí)行的任務
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 加鎖后,再次判斷線程池狀態(tài)
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果創(chuàng)建了新的Worker,則調用其start方法立即執(zhí)行
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWork()方法首先在一個自旋中做三個判斷:
- 線程是否關閉,若關閉則直接返回false退出
- 通過參數(shù)core來確定工作線程數(shù)與核心線程數(shù)比較?還是?與最大線程數(shù)比較,若工作線程數(shù)大,則返回false退出
- CAS嘗試將線程數(shù)加1,若成功則創(chuàng)建一個辨析的Worker并立即執(zhí)行其start()方法執(zhí)行該任務。
5.3?執(zhí)行任務runWork()
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 指向線程池調execute添加的任務
Runnable task = w.firstTask;
w.firstTask = null;
// 首先釋放鎖,允許中斷
w.unlock();
boolean completedAbruptly = true;
try {
// 從worker中取第1個任務,若任務為空則從阻塞隊列中取任務,直到返回null,這里達到線程復用的效果,實現(xiàn)線程處理多個任務。
while (task != null || (task = getTask()) != null) {
// 執(zhí)行任務前先加鎖
w.lock();
// 如果線程池已經(jīng)終止,則中斷該線程。保存了線程池在STOP狀態(tài)下線程中斷的,非STOP狀態(tài)下線程沒有被中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 記錄正在運行的任務
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務(調任務的run方法)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 執(zhí)行完任務,清除記錄信息
afterExecute(task, thrown);
}
} finally {
task = null;
// 當前Worker計數(shù)器+1,統(tǒng)計worker執(zhí)行了多少任務,最后累加進completedTaskCount變量,可以調用相應方法返回一些統(tǒng)計信息。
w.completedTasks++;
// 釋放鎖
w.unlock();
}
}
// 表示worker是否異常終止,執(zhí)行到這里代表執(zhí)行正常,后續(xù)的方法需要這個變量
completedAbruptly = false;
} finally {
// completedTasks累加到completedTaskCount變量中
processWorkerExit(w, completedAbruptly);
}
}
runWorker()的主要邏輯就是進行線程池的關閉檢查,然后執(zhí)行任務,并將計數(shù)器+1。
注意這行代碼?while (task != null || (task = getTask()) != null) ,當task = w.firstTask?的值為null時執(zhí)行task = getTask(),?getTask是從任務列隊是取任務。也就是說,Worker在執(zhí)行完提交給自己的任務后,會執(zhí)行任務隊列中的任務。
5.4?阻塞隊列取任務getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
說明:
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))?這行代碼體現(xiàn)出了 SHUTDOWN狀態(tài)和 STOP狀態(tài)的區(qū)別。若線程池狀態(tài)為 SHUDOWN 狀態(tài),則條件為 false,取任務執(zhí)行;而如果線程池的狀態(tài)為 STOP 狀態(tài),則條件為 true,不管隊列是否還有任務,不再處理了。
- timed后在的判斷邏輯有點復雜,以下幾種情況為true,CAS嘗試將線程數(shù)減1
- 工作線程數(shù)大于最大線程數(shù)(后面wc>1||workQueue.isEmpty()應該自然滿足)(可能是在運行中調用setMaximumPoolSize)
- 設置了allowCoreThreadTimeOut為true且隊列中取的任務為null,說明沒任務了
- 工作線程數(shù)大于核心線程數(shù)?且隊列中取的任務為null(后面wc>1||workQueue.isEmpty()應該自然滿足)
- try后面邏輯
- 延時取任務:allowCoreThreadTimeOut為true 或者 wc > corePoolSize
- 直接取任務(若沒任務則阻塞等待):allowCoreThreadTimeOut為false?或者 wc <= corePoolSize
結論:
- allowCoreThreadTimeOut設置為true時,工作線程數(shù)達最大之后,因無新任務而線程減少,工作線程總數(shù)最小值可以為0
- allowCoreThreadTimeOut設置為false時,只有wc大于核心線程數(shù),才去做CAS減線程數(shù)操作,所以工作線程數(shù)達到最大之后,因無新任務而線程減少,工作線程總數(shù)最小為核心線程數(shù)
5.5 Worker無任務最后處理processWorkerExit()
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
// 判斷狀態(tài)是否小于STOP
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
說明:
- decrementWorkerCount():如果為異常結束,則工作線程數(shù)減1
- try?邏輯:加鎖累加完成任務數(shù)
- tryTerminate():?嘗試終止線程池
- 判斷狀態(tài)是否小于STOP為true
- allowCoreThreadTimeOut設置為true
- 若隊列不為空:至少保留一個worker
- 若隊列為空:直接退出,線程池的worker數(shù)減少,最終可能為0
- allowCoreThreadTimeOut設置為false:?則保持worker數(shù)不少于corePoolSize(若線程數(shù)小于corePoolSize,則添加 null任務的worker
- allowCoreThreadTimeOut設置為true
總結worker:線程池啟動后,worker在池內創(chuàng)建,包裝了提交的Runnable任務并執(zhí)行,執(zhí)行完就等待下一個任務,不再需要時就結束。
5.6 關閉線程池
從圖3.1.2-1看出有兩種方法關閉線程池:
- shutdown:?不能再提交任務,已經(jīng)提交的任務可繼續(xù)執(zhí)行;
- shutdownNow:?不能再提交任務,已經(jīng)提交的任務未執(zhí)行的任務不再執(zhí)行,正在執(zhí)行的任務可繼續(xù)執(zhí)行,但會中斷,返回已提交未執(zhí)行的任務
5.6.1?關閉線程池shutdown()
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
說明:
- checkShutdownAccess():?安裝策略機構
- advanceRunState(SHUTDOWN):?線程池狀態(tài)切換到SHUTDOWN狀態(tài)
- interruptIdleWorkers():?中斷所有空閑的worker
- ?tryTerminate():?嘗試結束線程池
5.6.2?關閉線程池shutdownNow()
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
說明:
- checkShutdownAccess():?安裝策略機構
- advanceRunState(STOP):?線程池狀態(tài)切換到STOP狀態(tài)
- interruptWorkers():?中斷所有空閑的worker
- drainQueue():?取出等待隊列里未執(zhí)行的任務
- tryTerminate():?嘗試結束線程池
5.6.3?嘗試結束線程池tryTerminate()
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
說明:
- ctl.get():獲取ctl值判斷線程池狀態(tài),以下狀態(tài)不處理直接return
- RUNNING狀態(tài),正在運行狀態(tài)肯定不能停
- TIDYING或TERMINATED狀態(tài),已經(jīng)沒有正在運行的worker了
- SHUTDOWN狀態(tài)且阻塞隊列不為空,執(zhí)行完才能停
- 工作線程數(shù)不為0。又調了一次interruptIdleWorkers(ONLY_ONE),可能疑惑在調tryTerminate之前時已經(jīng)調用過了,為什么又調用,而且每次只中斷一個空閑worker?我們需要知道,shutdown時worker可能在執(zhí)行中,執(zhí)行完阻塞在隊列的take,不知道要結束,所有要補充調用interruptIdleWorkers。每次只中斷一個是因為processWorkerExit時,還會執(zhí)行tryTerminate,自動中斷下一個空閑的worker。
- try邏輯:加鎖CAS嘗試將線程池狀態(tài)切換成TIDYING,再切換成TERMINATED狀態(tài),terminated是空方法供子類來實現(xiàn)。
5.6.4?判斷線程池是否關閉awaitTermination()
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
說明:接收人timeout和TimeUnit兩個參數(shù),用于設定超時時間及單位。當?shù)却^設定時間時,會監(jiān)測ExecutorService是否已經(jīng)關閉,若關閉則返回true,否則返回false。一般情況下會和shutdown方法組合使用。文章來源:http://www.zghlxwxcb.cn/news/detail-728213.html
ps:?以上是研讀源碼加上翻閱許多文獻理解的總結,如有錯誤或不足的地方,歡迎指出,歡迎留言交流。我會繼續(xù)努力學習和分享更多有干貨的內容。文章來源地址http://www.zghlxwxcb.cn/news/detail-728213.html
到了這里,關于深入理解Java線程池ThreadPoolExcutor實現(xiàn)原理、數(shù)據(jù)結構和算法(源碼解析)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!