參考文章:
《Java 并發(fā)編程的藝術(shù)》
7000 字 + 24 張圖帶你徹底弄懂線程池
什么是線程池?有什么優(yōu)缺點(diǎn)?
(1)線程池 (ThreadPool)
是一種用于管理和復(fù)用線程的機(jī)制,它是在程序啟動(dòng)時(shí)就預(yù)先創(chuàng)建一定數(shù)量的線程,將這些線程放入一個(gè)池中,并對(duì)它們進(jìn)行有效的管理和復(fù)用,從而在需要執(zhí)行任務(wù)時(shí),可以從線程池中獲取一個(gè)可用線程來(lái)執(zhí)行任務(wù),任務(wù)執(zhí)行完畢后線程不會(huì)被銷毀而是返回線程池,以便下次使用。
(2)線程池的優(yōu)點(diǎn)主要有以下幾個(gè)方面:
- 降低資源消耗:線程的創(chuàng)建和銷毀都需要一定的系統(tǒng)資源和時(shí)間,如果每個(gè)任務(wù)都單獨(dú)創(chuàng)建一個(gè)線程,則會(huì)浪費(fèi)大量的系統(tǒng)資源和時(shí)間。而使用線程池可以避免這種浪費(fèi),通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
- 提高響應(yīng)速度:由于線程的創(chuàng)建和銷毀需要一定的時(shí)間和系統(tǒng)資源,如果任務(wù)的數(shù)量很多,頻繁地創(chuàng)建和銷毀線程會(huì)造成系統(tǒng)的負(fù)載過(guò)重,導(dǎo)致性能下降。而線程池可以預(yù)先創(chuàng)建一定數(shù)量的線程,當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
- 提高線程的可管理性:線程是稀缺資源,如果無(wú)限制地創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。但是要做到合理利用線程池,必須對(duì)其實(shí)現(xiàn)原理了如指掌。
(3)線程池的缺點(diǎn)主要有以下幾個(gè)方面:
- 內(nèi)存泄漏:如果線程池中的線程沒(méi)有正確地釋放或回收,可能會(huì)導(dǎo)致內(nèi)存泄漏問(wèn)題。
- 阻塞問(wèn)題:如果線程池中的線程都被占滿,可能會(huì)導(dǎo)致任務(wù)被阻塞,從而降低應(yīng)用程序的響應(yīng)性。
- 線程安全問(wèn)題:如果線程池中的線程共享數(shù)據(jù),可能會(huì)導(dǎo)致線程安全問(wèn)題,例如競(jìng)態(tài)條件和死鎖。
- 性能問(wèn)題:如果線程池的大小不合適或者線程的執(zhí)行時(shí)間不均衡,可能會(huì)導(dǎo)致性能問(wèn)題,例如響應(yīng)延遲和系統(tǒng)負(fù)載過(guò)高。
創(chuàng)建線程池的方式有哪些?
通過(guò) Executor 框架的工具類 Executors 來(lái)創(chuàng)建不同類型的線程池
(1)通過(guò)工具類 Executors,可以創(chuàng)建以下 3 種類型的 ThreadPoolExecutor
:
- FixedThreadPool:FixedThreadPool 被稱為可重用固定線程數(shù)的線程池。該線程池中的線程數(shù)量始終不變。當(dāng)有一個(gè)新的任務(wù)提交時(shí),線程池中若有空閑線程,則立即執(zhí)行。若沒(méi)有,則新的任務(wù)會(huì)被暫存在一個(gè)任務(wù)隊(duì)列中,待有線程空閑時(shí),便處理在任務(wù)隊(duì)列中的任務(wù)。
//創(chuàng)建一個(gè)固定大小的線程池,大小為 3
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- SingleThreadExecutor: SingleThreadExecutor是使用單個(gè) worker 線程的 Executor。若多余一個(gè)任務(wù)被提交到該線程池,任務(wù)會(huì)被保存在一個(gè)任務(wù)隊(duì)列中,待線程空閑,按先入先出的順序執(zhí)行隊(duì)列中的任務(wù)。
//創(chuàng)建一個(gè)單線程的線程池
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- CachedThreadPool:CachedThreadPool 是一個(gè)會(huì)根據(jù)需要?jiǎng)?chuàng)建新線程的線程池。線程池的線程數(shù)量不確定,但若有空閑線程可以復(fù)用,則會(huì)優(yōu)先使用可復(fù)用的線程。若所有線程均在工作,又有新的任務(wù)提交,則會(huì)創(chuàng)建新的線程處理任務(wù)。所有線程在當(dāng)前任務(wù)執(zhí)行完畢后,將返回線程池進(jìn)行復(fù)用。
//創(chuàng)建一個(gè)單線程的線程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
(2)通過(guò)工具類 Executors,可以創(chuàng)建下面的 ScheduledThreadPoolExecutor
,它是一個(gè)用來(lái)在給定的延遲后運(yùn)行任務(wù)或者定期執(zhí)行任務(wù)的線程池,繼承自 ThreadPoolExecutor
,并且其內(nèi)部使用DelayedWorkQueue
作為任務(wù)隊(duì)列。
//創(chuàng)建一個(gè)定時(shí)執(zhí)行的線程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
具體示例如下:
class ScheduledThreadPoolExecutorExample {
public static void main(String[] args) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
// 定時(shí)任務(wù),每隔 1 秒執(zhí)行一次
executor.scheduleAtFixedRate(() -> {
System.out.println("定時(shí)任務(wù)執(zhí)行了:" + System.currentTimeMillis());
}, 0, 1, TimeUnit.SECONDS);
// 延時(shí)任務(wù),延遲 2 秒后執(zhí)行
executor.schedule(() -> {
System.out.println("延時(shí)任務(wù)執(zhí)行了:" + System.currentTimeMillis());
}, 2, TimeUnit.SECONDS);
// 關(guān)閉線程池
executor.shutdown();
}
}
使用 ThreadPoolExecutor 類自定義線程池
ThreadPoolExecutor
是 Executor 框架的一個(gè)具體實(shí)現(xiàn),通過(guò)自定義 ThreadPoolExecutor 類可以創(chuàng)建更加靈活、符合需求的線程池。ThreadPoolExecutor
的 4 個(gè)構(gòu)造函數(shù)如下所示,有關(guān)每個(gè)參數(shù)的具體含義可參考面試題 3。
?注意事項(xiàng)
《阿里巴巴 Java 開發(fā)手冊(cè)》中有如下規(guī)定:
(1)【強(qiáng)制】線程資源必須通過(guò)線程池提供,不允許在應(yīng)用中自行顯式創(chuàng)建線程。
說(shuō)明:線程池的好處是減少在創(chuàng)建和銷毀線程上所消耗的時(shí)間以及系統(tǒng)資源的開銷,解決資源不足的問(wèn)題。如果不使用線程池,有可能造成系統(tǒng)創(chuàng)建大量同類線程而導(dǎo)致消耗完內(nèi)存或者“過(guò)度切換”的問(wèn)題。
(2)【強(qiáng)制】線程池不允許使用 Executors 去創(chuàng)建,而是通過(guò) ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。
說(shuō)明:Executors 返回的線程池對(duì)象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:允許的請(qǐng)求隊(duì)列長(zhǎng)度為 Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求,從而導(dǎo)致 OOM。
2) CachedThreadPool:允許的創(chuàng)建線程數(shù)量為 Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程,從而導(dǎo)致 OOM。
(3)使用 Executors.newCachedThreadPool()
方法創(chuàng)建線程池可能會(huì)導(dǎo)致以下問(wèn)題:
-
線程數(shù)量過(guò)多:
newCachedThreadPool
是一個(gè)無(wú)界線程池,它會(huì)根據(jù)任務(wù)的需求動(dòng)態(tài)地創(chuàng)建線程。如果任務(wù)提交的速度遠(yuǎn)遠(yuǎn)大于線程處理的速度,線程池將會(huì)不斷地創(chuàng)建新線程,可能導(dǎo)致線程數(shù)量過(guò)多,從而占用大量系統(tǒng)資源(如內(nèi)存和CPU),降低系統(tǒng)性能甚至引發(fā)內(nèi)存溢出或系統(tǒng)崩潰。 - 線程過(guò)多導(dǎo)致線程切換開銷增加:線程切換是有開銷的,當(dāng)線程數(shù)量過(guò)多時(shí),系統(tǒng)需要頻繁地進(jìn)行線程切換,導(dǎo)致額外的開銷,降低了整體的執(zhí)行效率。
-
對(duì)長(zhǎng)時(shí)間執(zhí)行的任務(wù)不適用:由于
newCachedThreadPool
是為了快速處理任務(wù)而創(chuàng)建線程的,它會(huì)緩存線程一段時(shí)間以備重復(fù)利用,默認(rèn)的空閑線程存活時(shí)間是60s
。這對(duì)于長(zhǎng)時(shí)間執(zhí)行的任務(wù)來(lái)說(shuō)可能不適用,因?yàn)殚L(zhǎng)時(shí)間運(yùn)行的任務(wù)可能需要維持相對(duì)穩(wěn)定的線程池大小,而不是頻繁地創(chuàng)建和銷毀線程。 - 可能出現(xiàn)任務(wù)飽和:當(dāng)任務(wù)數(shù)量超過(guò)線程池的處理能力時(shí),剩余的任務(wù)將會(huì)排隊(duì)等待執(zhí)行。如果長(zhǎng)時(shí)間保持任務(wù)提交速度高于線程處理速度,會(huì)導(dǎo)致任務(wù)隊(duì)列越來(lái)越長(zhǎng),最終可能造成任務(wù)飽和,系統(tǒng)響應(yīng)變得極慢。
因此,在使用 newCachedThreadPool
創(chuàng)建線程池時(shí)需要謹(jǐn)慎,根據(jù)實(shí)際場(chǎng)景合理地評(píng)估任務(wù)的特點(diǎn)和負(fù)載情況,選擇適當(dāng)?shù)木€程池類型和參數(shù)配置,以避免以上問(wèn)題的發(fā)生,并保證系統(tǒng)的性能和穩(wěn)定性。
?自定義線程池時(shí)有哪些參數(shù)?它們各有說(shuō)明含義?
構(gòu)造函數(shù)
使用下面 ThreadPoolExecutor 類的構(gòu)造函數(shù)可以自定義線程池,參數(shù)最全的構(gòu)造函數(shù)如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
參數(shù)含義
其中的參數(shù)含義如下所示:
參數(shù) | 含義 |
---|---|
corePoolSize | 線程池基本大小,當(dāng)提交一個(gè)任務(wù)到線程池時(shí),線程池會(huì)創(chuàng)建一個(gè)線程來(lái)執(zhí)行任務(wù),即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會(huì)創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時(shí)就不再創(chuàng)建 |
maximumPoolSize | 線程池允許創(chuàng)建的最大線程數(shù),如果隊(duì)列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù),值得注意的是,如果使用了無(wú)界的任務(wù)隊(duì)列,那么這個(gè)參數(shù)就沒(méi)什么效果 |
keepAliveTime |
線程活動(dòng)保持時(shí)間,線程池中的線程數(shù)量大于 corePoolSize 時(shí),如果這時(shí)沒(méi)有新的任務(wù)提交,核心線程外的線程不會(huì)立即銷毀,而是會(huì)等待,直到等待的時(shí)間超過(guò)了 keepAliveTime 才會(huì)被回收銷毀 |
unit | 線程活動(dòng)保持時(shí)間的單位,即 keepAliveTime 的時(shí)間單位 |
workQueue | 用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列,新任務(wù)來(lái)的時(shí)候會(huì)先判斷當(dāng)前運(yùn)行的線程數(shù)量是否達(dá)到核心線程數(shù),如果達(dá)到的話,新任務(wù)就會(huì)被存放在任務(wù)隊(duì)列中。值得注意的是,如果使用了工作隊(duì)列 SynchronousQueue,那么當(dāng)任務(wù)數(shù)超過(guò)線程池的核心線程數(shù)時(shí),該任務(wù)不會(huì)進(jìn)入隊(duì)列 |
threadFactory | 用于設(shè)置創(chuàng)建線程的工廠,決定了如何創(chuàng)建新線程,例如可以通過(guò)線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè)置更有意義的名字。使用開源框架 guava 提供的 ThreadFactoryBuilder 可以快速給線程池里的線程設(shè)置有意義的名字 |
handler | 當(dāng)隊(duì)列和線程池都滿了,說(shuō)明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。這個(gè)策略默認(rèn)情況下是 AbortPolicy ,表示無(wú)法處理新任務(wù)時(shí)拋出異常 |
① 上圖來(lái)源:《Java 性能調(diào)優(yōu)實(shí)戰(zhàn)》
② 上述提到的 7 個(gè)參數(shù)中,corePoolSize
、maximumPoolSize
、workQueue
這 3 個(gè)參數(shù)是核心參數(shù)。
③ 上述 workQueue 就是 BlockingQueue,有關(guān) BlockingQueue 的相關(guān)知識(shí)可以參考 Java 并發(fā)編程面試題——BlockingQueue 這篇文章。
使用示例
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;
class Solution {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//自定義線程池
int corePoolSize = 2;
int maximumPoolSize = 5;
long keepAliveTime = 50;
// keepAliveTime 的單位
TimeUnit unit = TimeUnit.MICROSECONDS;
//工作隊(duì)列 workQueue
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(3);
//使用開源框架 guava 提供的 ThreadFactoryBuilder 可以給線程池里的線程自定義名字
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("demo-task-%d").build();
//飽和策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
ThreadPoolExecutor threadsPool = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize,
keepAliveTime, unit,
blockingQueue, threadFactory,
handler);
//執(zhí)行無(wú)返回值的任務(wù)
Runnable taskWithoutRet = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is running");
}
};
threadsPool.execute(taskWithoutRet);
//執(zhí)行有返回值的任務(wù)
FutureTask<Integer> taskWithRet = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName() + " is running");
//線程睡眠 1000 ms
Thread.sleep(1000);
return 100;
}
});
threadsPool.submit(taskWithRet);
System.out.println("有返回值的任務(wù)的結(jié)果為: " + taskWithRet.get());
//關(guān)閉線程池
threadsPool.shutdown();
}
}
輸出結(jié)果如下:
demo-task-0 is running
demo-task-1 is running
有返回值的任務(wù)的結(jié)果為: 100
在 Java 8 中可以使用 lambda 表達(dá)式來(lái)對(duì)創(chuàng)建任務(wù)的代碼進(jìn)行簡(jiǎn)化:
Runnable taskWithoutRet = () -> System.out.println(Thread.currentThread().getName() + " is running");
FutureTask<Integer> taskWithRet = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName() + " is running");
//線程睡眠 1000 ms
Thread.sleep(1000);
return 100;
});
與 FutureTask 有關(guān)的知識(shí)可以參考 Java 并發(fā)編程面試題——Future 這篇文章。
線程池的拒絕策略有哪些?如何自定義拒絕策略?
(1)如果線程到達(dá) maximumPoolSize
仍然有新任務(wù),這時(shí)線程池會(huì)執(zhí)行拒絕策略。ThreadPoolExecutor
為接口 RejectedExecutionHandler
提供了以下 4 種實(shí)現(xiàn),它們都是 ThreadPoolExecutor 中的靜態(tài)內(nèi)部類:
-
AbortPolicy:拋出
RejectedExecutionException
異常來(lái)拒絕新任務(wù)的處理,這也是默認(rèn)的策略; - CallerRunsPolicy:只用調(diào)用者所在線程來(lái)運(yùn)行任務(wù),也就是直接在調(diào)用 execute 方法的線程中運(yùn)行 (run) 被拒絕的任務(wù),如果執(zhí)行程序已關(guān)閉,則會(huì)丟棄該任務(wù)。因此這種策略會(huì)降低對(duì)于新任務(wù)提交速度,影響程序的整體性能。如果應(yīng)用程序可以承受此延遲并且你要求任何一個(gè)任務(wù)請(qǐng)求都要被執(zhí)行的話,那么可以選擇這個(gè)策略;
- DiscardPolicy:不處理本次任務(wù),選擇直接放棄;
-
DiscardOldestPolicy:放棄隊(duì)列中最早的任務(wù),本任務(wù)取而代之;
(2)在 Java 線程池中,可以通過(guò)實(shí)現(xiàn) RejectedExecutionHandler
接口來(lái)自定義拒絕策略。拒絕策略定義了當(dāng)線程池?zé)o法接受新任務(wù)時(shí)應(yīng)該采取的操作。RejectedExecutionHandler
接口有一個(gè)方法 rejectedExecution
,在線程池?zé)o法接受新任務(wù)時(shí)會(huì)調(diào)用該方法。以下是一個(gè)示例代碼,演示如何自定義拒絕策略:
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
System.out.println("任務(wù)被拒絕: " + runnable.toString());
// 執(zhí)行自定義的拒絕策略,可以選擇拋出異常、丟棄任務(wù)或阻塞等操作
// 這里我們簡(jiǎn)單地拋出一個(gè)異常
throw new RejectedExecutionException("線程池已滿,無(wú)法接受新任務(wù)!");
}
}
public class ThreadPoolExample {
public static void main(String[] args) {
// 創(chuàng)建線程池
int corePoolSize = 5;
int maxPoolSize = 10;
long keepAliveTime = 1000;
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2), new CustomRejectedExecutionHandler());
// 執(zhí)行任務(wù)
for (int i = 1; i <= 15; i++) {
final int taskId = i;
threadPool.execute(() -> {
System.out.println("執(zhí)行任務(wù): " + taskId);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 關(guān)閉線程池
threadPool.shutdown();
}
}
某次輸出如下:
執(zhí)行任務(wù): 1
執(zhí)行任務(wù): 5
執(zhí)行任務(wù): 7
執(zhí)行任務(wù): 2
執(zhí)行任務(wù): 3
執(zhí)行任務(wù): 4
執(zhí)行任務(wù): 9
執(zhí)行任務(wù): 8
執(zhí)行任務(wù): 11
任務(wù)被拒絕: com.exam.ThreadPoolExample$$Lambda$1/2093631819@4f3f5b24
執(zhí)行任務(wù): 10
Exception in thread "main" java.util.concurrent.RejectedExecutionException: 線程池已滿,無(wú)法接受新任務(wù)!
at com.exam.CustomRejectedExecutionHandler.rejectedExecution(Main.java:73)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.exam.ThreadPoolExample.main(Main.java:91)
執(zhí)行任務(wù): 6
在上述示例中,我們首先創(chuàng)建了一個(gè)自定義的 RejectedExecutionHandler
實(shí)現(xiàn)CustomRejectedExecutionHandler
,其中的rejectedExecution
方法簡(jiǎn)單地拋出一個(gè) RejectedExecutionException
異常。然后,在創(chuàng)建線程池時(shí),我們將上述自定義的拒絕策略傳遞給 ThreadPoolExecutor
的構(gòu)造函數(shù)。當(dāng)線程池?zé)o法接受新任務(wù)時(shí),拒絕策略會(huì)被調(diào)用,我們?cè)谶@個(gè)方法中拋出一個(gè)異常,顯示任務(wù)被拒絕。你也可以根據(jù)自己的需求,實(shí)現(xiàn)不同的拒絕策略,例如將任務(wù)丟棄、阻塞等。請(qǐng)注意,當(dāng)自定義拒絕策略拋出異常時(shí),可能需要適當(dāng)?shù)靥幚懋惓?,以免影響程序的?zhí)行。
?線程池處理任務(wù)的流程是什么樣的?
(1)線程池的主要處理流程如下圖所示:
(2)從圖中可以看出,當(dāng)提交一個(gè)新任務(wù)到線程池時(shí),線程池的處理流程如下:
- 線程池判斷當(dāng)前工作線程數(shù)是否小于
核心線程數(shù) (其大小為 corePoolSize)
。如果小于,即核心線程池未滿,則創(chuàng)建一個(gè)新的工作線程來(lái)執(zhí)行任務(wù)。如果大于,即核心線程池里的線程都在執(zhí)行任務(wù),則進(jìn)入下個(gè)流程; - 線程池判斷
工作隊(duì)列 (workQueue)
是否已經(jīng)滿。如果工作隊(duì)列沒(méi)有滿,則將新提交的任務(wù)存儲(chǔ)在這個(gè)工作隊(duì)列里。如果工作隊(duì)列滿了,則進(jìn)入下個(gè)流程; - 線程池判斷
線程池的線程 (大小為 maximumPoolSize)
是否都處于工作狀態(tài)。如果沒(méi)有,則創(chuàng)建一個(gè)新的工作線程來(lái)執(zhí)行任務(wù)。如果已經(jīng)滿了,則交給飽和策略 (RejectedExecutionHandler)
來(lái)處理這個(gè)任務(wù);
?為什么線程池要先把任務(wù)放在阻塞隊(duì)列中,然后再調(diào)用非核心線程處理任務(wù)?
(1)線程池將任務(wù)放在阻塞隊(duì)列中,并先調(diào)用非核心線程來(lái)處理任務(wù)的原因有以下幾點(diǎn):
- 平衡資源利用率:通過(guò)將任務(wù)放在阻塞隊(duì)列中,可以避免瞬間來(lái)臨的大量任務(wù)導(dǎo)致創(chuàng)建過(guò)多的線程,從而平衡了系統(tǒng)資源的利用率。非核心線程可以限制線程數(shù)量,而阻塞隊(duì)列可以緩沖任務(wù),讓系統(tǒng)在承載能力范圍內(nèi)保持穩(wěn)定的負(fù)載。
- 控制線程數(shù)量:通過(guò)將任務(wù)放在阻塞隊(duì)列中,非核心線程可以通過(guò)輪詢阻塞隊(duì)列來(lái)獲取任務(wù)并執(zhí)行。這種方式可以限制線程的數(shù)量,防止線程過(guò)多占用過(guò)多資源,避免系統(tǒng)資源耗盡。
- 避免任務(wù)丟失:當(dāng)任務(wù)到達(dá)時(shí),如果線程池中的非核心線程都已經(jīng)被占用,那么新的任務(wù)將被放在阻塞隊(duì)列中等待執(zhí)行。這樣可以避免任務(wù)丟失,盡可能確保所有任務(wù)都能得到處理。
(2)綜上所述,在線程池中先將任務(wù)放在阻塞隊(duì)列中,然后再調(diào)用非核心線程來(lái)處理任務(wù),可以實(shí)現(xiàn)對(duì)線程數(shù)量的控制、平衡資源利用率、提高響應(yīng)速度和避免任務(wù)丟失等好處。這樣設(shè)計(jì)可以更有效地管理和利用系統(tǒng)資源,并提供更高效的并發(fā)處理能力。
線程池中線程實(shí)現(xiàn)復(fù)用的原理是什么?
(1)線程池的核心功能就是實(shí)現(xiàn)了線程的重復(fù)利用,而在線程池內(nèi)部其實(shí)是被封裝成一個(gè) Worker
對(duì)象:
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
//...
}
}
從上述源碼可知,Worker 繼承了 AQS,即有一定鎖的特性。創(chuàng)建線程來(lái)執(zhí)行任務(wù)的方法上面提到是通過(guò) addWorker
方法創(chuàng)建的。在創(chuàng)建 Worker 對(duì)象的時(shí)候,會(huì)把線程和任務(wù)一起封裝到 Worker 內(nèi)部,然后調(diào)用 runWorker
方法來(lái)讓線程執(zhí)行任務(wù):
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
}
(2)runWorker
方法內(nèi)部使用了 while 死循環(huán),當(dāng)?shù)谝粋€(gè)任務(wù)執(zhí)行完之后,會(huì)不斷地通過(guò) getTask
方法獲取任務(wù):
- 如果能獲取到任務(wù),就會(huì)調(diào)用
run
方法,繼續(xù)執(zhí)行任務(wù),這就是線程能夠復(fù)用的主要原因。 - 如果不能獲取到任務(wù),最后就會(huì)調(diào)用 finally 中的
processWorkerExit
方法,來(lái)將線程退出。
(3)需要注意的是,因?yàn)?Worker 繼承了 AQS,每次在執(zhí)行任務(wù)之前都會(huì)調(diào)用 lock
方法,執(zhí)行完任務(wù)之后,會(huì)調(diào)用 unlock
方法,這樣做的目的在于通過(guò) Woker 的加鎖狀態(tài)就能判斷出當(dāng)前線程是否正在運(yùn)行任務(wù)。
(4)如果想知道線程是否正在運(yùn)行任務(wù),只需要調(diào)用 Woker 的 tryLock
方法,根據(jù)是否加鎖成功來(lái)進(jìn)行判斷:加鎖成功說(shuō)明當(dāng)前線程沒(méi)有加鎖,也就沒(méi)有執(zhí)行任務(wù)。在調(diào)用 shutdown
方法關(guān)閉線程池的時(shí)候,就是通過(guò)這種方式來(lái)判斷線程有沒(méi)有在執(zhí)行任務(wù),如果沒(méi)有的話,來(lái)嘗試打斷沒(méi)有執(zhí)行任務(wù)的線程。
線程是如何獲取任務(wù)的以及如何實(shí)現(xiàn)超時(shí)的?
線程在執(zhí)行完任務(wù)之后,會(huì)繼續(xù)從 getTask
方法中獲取任務(wù),獲取不到就會(huì)退出。getTask
方法的源碼如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
}
前面一部分是線程池的一些狀態(tài)的判斷,下面這行代碼
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
是判斷當(dāng)前過(guò)來(lái)獲取任務(wù)的線程是否可以超時(shí)退出。如果 allowCoreThreadTimeOut
設(shè)置為 true 或者線程池當(dāng)前的線程數(shù)大于核心線程數(shù),也就是 corePoolSize,那么該獲取任務(wù)的線程就可以超時(shí)退出。那是怎么做到超時(shí)退出呢,就是這行核心代碼
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
會(huì)根據(jù)是否允許超時(shí)來(lái)選擇調(diào)用阻塞隊(duì)列 workQueue 的 poll
方法或者 take
方法:
- 如果允許超時(shí),則會(huì)調(diào)用
poll
方法,傳入keepAliveTime
,也就是構(gòu)造線程池時(shí)傳入的空閑時(shí)間,這個(gè)方法的意思就是從隊(duì)列中阻塞 keepAliveTime 時(shí)間來(lái)獲取任務(wù),獲取不到就會(huì)返回 null; - 如果不允許超時(shí),就會(huì)調(diào)用 take 方法,這個(gè)方法會(huì)一直阻塞獲取任務(wù),直到從隊(duì)列中獲取到任務(wù)位置。
從這里可以看到 keepAliveTime 是如何使用的了。所以到這里應(yīng)該就知道線程池中的線程為什么可以做到空閑一定時(shí)間就退出了。其實(shí)最主要的是利用了阻塞隊(duì)列的 poll 方法的實(shí)現(xiàn),這個(gè)方法可以指定超時(shí)時(shí)間,一旦線程達(dá)到了 keepAliveTime 還沒(méi)有獲取到任務(wù),那么就會(huì)返回 null,前面提到 getTask 方法返回 null,線程就會(huì)退出。
需要注意的是,判斷當(dāng)前獲取任務(wù)的線程是否可以超時(shí)退出的時(shí)候,如果將 allowCoreThreadTimeOut 設(shè)置為 true,那么所有線程走到這個(gè)timed 都是 true,那么所有的線程,包括核心線程都可以做到超時(shí)退出。如果你的線程池需要將核心線程超時(shí)退出,那么可以通過(guò)allowCoreThreadTimeOut
方法將 allowCoreThreadTimeOut
變量設(shè)置為 true。
線程池的關(guān)閉是如何進(jìn)行的?
線程池提供了 shutdown
和 shutdownNow
兩個(gè)方法來(lái)關(guān)閉線程池。
shutdown()
shutdown
方法將線程池的狀態(tài)修改為 SHUTDOWN
,然后嘗試打斷空閑的線程(如何判斷空閑,前面有提到),也就是在阻塞等待任務(wù)的線程。
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
}
shutdownNow()
shutdownNow
方法將線程池的狀態(tài)修改為 STOP
,然后嘗試打斷所有的線程,從阻塞隊(duì)列中移除剩余的任務(wù),這也是為什么 shutdownNow 不能執(zhí)行剩余任務(wù)的原因。
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
}
所以也可以看出 shutdown 方法和 shutdownNow 方法的主要區(qū)別就是:shutdown 之后還能處理在隊(duì)列中的任務(wù),shutdownNow 直接就將任務(wù)從隊(duì)列中移除,線程池里的線程就不再處理了。
如果在線程池中配置了有限的最大線程數(shù)量、有界隊(duì)列,那么當(dāng)請(qǐng)求過(guò)多時(shí),應(yīng)該如何解決任務(wù)可能丟失的問(wèn)題?
可以通過(guò)實(shí)現(xiàn) RejectedExecutionHandler
接口來(lái)自定義拒絕策略,如果線程池?zé)o法執(zhí)行更多的任務(wù)了,此時(shí)建議可以把任務(wù)信息持久化寫入到磁盤里面去,后臺(tái)專門啟動(dòng)一個(gè)線程,后續(xù)等待你的線程池的工作負(fù)荷降低了,再慢慢的從磁盤中讀取之間持久化的任務(wù),重新提交到線程池中去執(zhí)行。
如果線上機(jī)器宕機(jī),線程池中的阻塞隊(duì)列可能還有未處理的任務(wù),應(yīng)該如何解決這個(gè)問(wèn)題?
在提交一個(gè)任務(wù)到線程池里去之前,可以先將該任務(wù)的信息保存到數(shù)據(jù)庫(kù)中,并記錄其狀態(tài)(狀態(tài)包括未提交、已提交、已完成,初始時(shí)為未提交)。當(dāng)該任務(wù)提交之后,更新其狀態(tài)為已提交即可。系統(tǒng)重啟之后后臺(tái)線程去掃描數(shù)據(jù)庫(kù)里的未提交和已提交的任務(wù)狀態(tài)。可以把未提交、已提交的任務(wù)重新提交到線程池中繼續(xù)執(zhí)行即可。
線程池的隊(duì)列滿了之后可能會(huì)發(fā)生什么?
對(duì)于這個(gè)問(wèn)題,我們可以分情況進(jìn)行討論:
- 如果線程池中的隊(duì)列滿了并且創(chuàng)建的線程數(shù)達(dá)到了
corePoolSize
之后,會(huì)判斷如果maximumPoolSize
大于coolPoolSize
,此時(shí)會(huì)創(chuàng)建不會(huì)大于maximumPoolSize
額外的線程處理隊(duì)列中的請(qǐng)求。 - 如果隊(duì)列中的請(qǐng)求處理完了,額外的線程數(shù)會(huì)存活
keepAliveTime
時(shí)間后會(huì)自動(dòng)銷毀。 - 如果隊(duì)列滿了,并且額外的線程也已經(jīng)滿負(fù)荷了。這個(gè)時(shí)候會(huì)執(zhí)行對(duì)應(yīng)的拒絕策略。
ThreadPoolExecutor 執(zhí)行 execute() 方法的執(zhí)行流程是什么樣的?
(1)ThreadPoolExecutor 執(zhí)行 execute() 方法的執(zhí)行流程如下如所示,我們可以將其看作線程池執(zhí)行流程在代碼中的具體表現(xiàn)。
(2)ThreadPoolExecutor 執(zhí)行 execute 方法分下面 4 種情況:
① 如果當(dāng)前運(yùn)行的線程少于 corePoolSize
,則創(chuàng)建新線程來(lái)執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。
② 如果運(yùn)行的線程等于或多于 corePoolSize
,則將任務(wù)加入 BlockingQueue
。
③ 如果無(wú)法將任務(wù)加入 BlockingQueue
(隊(duì)列已滿),則創(chuàng)建新的線程來(lái)處理任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。
④ 如果創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出 maximumPoolSize
,任務(wù)將被拒絕,并調(diào)用 RejectedExecutionHandler.rejectedExecution()
方法。
ThreadPoolExecutor 采取上述步驟的總體設(shè)計(jì)思路,是為了在執(zhí)行 execute()
方法時(shí),盡可能地避免獲取全局鎖(那將會(huì)是一個(gè)嚴(yán)重的可伸縮瓶頸)。在 ThreadPoolExecutor 完成預(yù)熱之后(當(dāng)前運(yùn)行的線程數(shù)大于等于 corePoolSize),幾乎所有的 execute() 方法調(diào)用都是執(zhí)行步驟 ②,而步驟 ② 不需要獲取全局鎖。
如何給線程池命名?
初始化線程池的時(shí)候需要顯示命名(設(shè)置線程池名稱前綴),有利于定位問(wèn)題。默認(rèn)情況下創(chuàng)建的線程名字類似 pool-1-thread-n
這樣的,沒(méi)有業(yè)務(wù)含義,不利于定位問(wèn)題。給線程池里的線程命名通常有下面兩種方式:
使用 guava 的 ThreadFactoryBuilder
ThreadPoolExecutor 的構(gòu)造函數(shù)中的參數(shù) ThreadFactory
是用于設(shè)置創(chuàng)建線程的工廠,可以通過(guò)線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè)置更有意義的名字。使用開源框架 guava
提供的 ThreadFactoryBuilder
可以快速給線程池里的線程設(shè)置有意義的名字,代碼如下:
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
實(shí)現(xiàn) ThreadFactory 接口
- 線程池參數(shù)
threadFactory
的作用是為線程池創(chuàng)建新線程并提供線程的配置。通過(guò)自定義 threadFactory,可以靈活地指定線程的屬性。當(dāng)線程池需要?jiǎng)?chuàng)建新線程來(lái)執(zhí)行任務(wù)時(shí),會(huì)調(diào)用 threadFactory 的newThread()
方法創(chuàng)建一個(gè)線程,并將其添加到線程池中。因此,通過(guò)自定義 threadFactory,我們可以對(duì)新創(chuàng)建的線程進(jìn)行一些個(gè)性化的設(shè)置,例如線程名、優(yōu)先級(jí)、是否為守護(hù)線程等。 - 常見(jiàn)的線程池實(shí)現(xiàn)類(如 ThreadPoolExecutor)通常提供了一個(gè)默認(rèn)的 threadFactory,如果不指定 threadFactory 參數(shù),會(huì)使用默認(rèn)的線程工廠。但是,我們也可以根據(jù)需求自己實(shí)現(xiàn)
ThreadFactory
接口,以創(chuàng)建滿足特定要求的線程。 - 總結(jié)起來(lái),threadFactory 參數(shù)在線程池中的作用是用于創(chuàng)建新線程,并可以根據(jù)需要進(jìn)行線程的個(gè)性化配置。通過(guò)自定義 threadFactory,我們能夠更好地控制和管理線程池中的線程。
//帶有前綴名稱的線程工廠
public static class NamedThreadFactory implements ThreadFactory {
//線程名前綴
private final String prefix;
//線程編號(hào)
private final AtomicInteger threadNumber = new AtomicInteger(1);
public NamedThreadFactory(String prefix) {
this.prefix = prefix;
}
//重寫 newThread 方法
@Override
public Thread newThread(Runnable r) {
return new Thread(null, r, prefix + threadNumber.getAndIncrement());
}
}
線程池的狀態(tài)有哪些?
ThreadPoolExecutor 使用變量 ctl 的高 3 位來(lái)表示線程池狀態(tài),低 29 位表示線程數(shù)量。
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
private static final int COUNT_BITS = Integer.SIZE - 3;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
}
狀態(tài)名 | 高 3 位 | 接收新任務(wù) | 處理阻塞隊(duì)列任務(wù) | 說(shuō)明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | 線程池創(chuàng)建時(shí)就是這個(gè)狀態(tài),能夠接收新任務(wù),以及對(duì)已添加的任務(wù)進(jìn)行處理。 |
SHUTDOWN | 000 | N | Y | 調(diào)用shutdown方法線程池就會(huì)轉(zhuǎn)換成SHUTDOWN狀態(tài),此時(shí)線程池不再接收新任務(wù),但能繼續(xù)處理已添加的任務(wù)到隊(duì)列中任務(wù)。 |
STOP | 001 | N | N | 調(diào)用 shutdownNow 方法線程池就會(huì)轉(zhuǎn)換成 STOP 狀態(tài),不接收新任務(wù),也不能繼續(xù)處理已添加的任務(wù)到隊(duì)列中任務(wù),并且會(huì)嘗試中斷正在處理的任務(wù)的線程。 |
TIDYING | 010 | N | N | SHUTDOWN 狀態(tài)下,任務(wù)數(shù)為 0, 其他所有任務(wù)已終止,線程池會(huì)變?yōu)?TIDYING 狀態(tài)。線程池在 SHUTDOWN 狀態(tài),任務(wù)隊(duì)列為空且執(zhí)行中任務(wù)為空,線程池會(huì)變?yōu)?TIDYING 狀態(tài)。線程池在 STOP 狀態(tài),線程池中執(zhí)行中任務(wù)為空時(shí),線程池會(huì)變?yōu)?TIDYING 狀態(tài)。 |
TERMINATED | 011 | N | N | 線程池徹底終止。線程池在 TIDYING 狀態(tài)執(zhí)行完 terminated() 方法就會(huì)轉(zhuǎn)變?yōu)?TERMINATED 狀態(tài)。 |
從數(shù)字上比較,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
。這些信息存儲(chǔ)在一個(gè)原子變量 ctl 中,目的是將線程池狀態(tài)與線程個(gè)數(shù)合二為一,這樣就可以用一次 CAS 原子操作進(jìn)行賦值。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// c 為舊值, ctlOf 返回結(jié)果為新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 為高 3 位代表線程池狀態(tài), wc 為低 29 位代表線程個(gè)數(shù),ctl 是合并它們
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
?如何設(shè)置線程池的大???
(1)首先,需要明確的是線程池大小不能設(shè)置地過(guò)大或者過(guò)小:
- 如果線程池設(shè)置地太大,那么大量的線程可能會(huì)同時(shí)在競(jìng)爭(zhēng) CPU 資源,這樣會(huì)導(dǎo)致大量的上下文切換,從而增加線程的執(zhí)行時(shí)間,影響了整體執(zhí)行效率。
- 如果線程池設(shè)置地太小,那么當(dāng)同一時(shí)間有大量任務(wù)需要處理時(shí),可能會(huì)導(dǎo)致它們?cè)谌蝿?wù)隊(duì)列中排隊(duì)等待執(zhí)行,甚至?xí)霈F(xiàn)任務(wù)隊(duì)列滿了之后任務(wù)無(wú)法處理的情況,或者大量任務(wù)堆積在任務(wù)隊(duì)列導(dǎo)致 OOM。這樣一來(lái),CPU 沒(méi)有得到充分的利用。
(2)要想合理地配置線程池,可以從任務(wù)特性入手來(lái)進(jìn)行分析:
- 任務(wù)的性質(zhì),性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理:
- CPU 密集型任務(wù):應(yīng)配置盡可能小的線程,如配置 N c p u N_{cpu} Ncpu? + 1 個(gè)線程的線程池;
- I/O 密集型任務(wù):由于 I/O 密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如 2 ? N c p u 2 * N_{cpu} 2?Ncpu?
- 混合型任務(wù):如果可以拆分,將其拆分成一個(gè) CPU 密集型任務(wù)和一個(gè) I/O 密集型任務(wù),只要這兩個(gè)任務(wù)執(zhí)行的時(shí)間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量。如果這兩個(gè)任務(wù)執(zhí)行時(shí)間相差太大,則沒(méi)必要進(jìn)行分解。
- 任務(wù)的優(yōu)先級(jí):高、中和低。優(yōu)先級(jí)不同的任務(wù)可以使用優(yōu)先級(jí)隊(duì)列
PriorityBlockingQueue
來(lái)處理。它可以讓優(yōu)先級(jí)高的任務(wù)先執(zhí)行。不過(guò)需要注意的是,如果一直有優(yōu)先級(jí)高的任務(wù)提交到隊(duì)列里,那么優(yōu)先級(jí)低的任務(wù)可能永遠(yuǎn)不能執(zhí)行。 - 任務(wù)的執(zhí)行時(shí)間:長(zhǎng)、中和短。執(zhí)行時(shí)間不同的任務(wù)可以交給不同規(guī)模的線程池來(lái)處理,或者可以使用優(yōu)先級(jí)隊(duì)列,讓執(zhí)行時(shí)間短的任務(wù)先執(zhí)行。
- 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫(kù)連接。依賴數(shù)據(jù)庫(kù)連接池的任務(wù),因?yàn)榫€程提交SQL后需要等待數(shù)據(jù)庫(kù)返回結(jié)果,等待的時(shí)間越長(zhǎng),則 CPU 空閑時(shí)間就越長(zhǎng),那么線程數(shù)應(yīng)該設(shè)置得越大,這樣才能更好地利用 CPU。
如何判斷是 CPU 密集任務(wù)還是 I/O 密集任務(wù)?
答:簡(jiǎn)單來(lái)說(shuō),CPU 密集型任務(wù)指需要利用 CPU 計(jì)算能力的任務(wù),例如對(duì)大量數(shù)據(jù)進(jìn)行運(yùn)算。而 I/O 密集型指涉及到網(wǎng)絡(luò)讀取、文件讀取等的任務(wù),其特點(diǎn)在于 I/O 操作完成的時(shí)間遠(yuǎn)大于 CPU 計(jì)算耗費(fèi)的時(shí)間,即大部分時(shí)間都花在了等待 I/O 操作完成上。
Java 中如何獲取當(dāng)前 CPU 的核心數(shù)?
答:使用int cores = Runtime.getRuntime().availableProcessors();
這行代碼即可獲取當(dāng)前 CPU 的核心數(shù)。
對(duì)于 CPU 密集型任務(wù),線程池的大小為什么不設(shè)置為 n,而是 n + 1?
答:對(duì)于 CPU 密集型任務(wù),在有 n 個(gè)處理器的系統(tǒng)上,當(dāng)線程池的大小設(shè)置為 n + 1 時(shí),能實(shí)現(xiàn) CPU 的最優(yōu)利用率,因?yàn)榧词巩?dāng) CPU 密集型的線程偶爾由于頁(yè)缺失故障或者其他原因暫停時(shí),這個(gè)“額外”的線程也能確保 CPU 的時(shí)鐘周期不會(huì)被浪費(fèi),從未盡可能地利用 CPU 資源。
為什么對(duì)于 I/O 密集型任務(wù),線程數(shù)建議設(shè)置為 2 * n?
因?yàn)樵?I/O 密集型任務(wù)中,大部分時(shí)間線程都在等待外部 I/O 操作完成,而不是進(jìn)行計(jì)算密集型的工作,并且用于上下文切換的時(shí)間要遠(yuǎn)小于阻塞等待的時(shí)間。例如,當(dāng)線程在從磁盤讀取數(shù)據(jù)或等待網(wǎng)絡(luò)響應(yīng)時(shí),它會(huì)處于阻塞狀態(tài),CPU 資源不會(huì)被過(guò)多占用。
假設(shè)在一個(gè)請(qǐng)求中,計(jì)算操作需要 5ms,I/O 操作需要 100 ms。對(duì)于一臺(tái)有 8 核的機(jī)器,如果要求 CPU 利用率達(dá)到 100%,應(yīng)該如何設(shè)置線程數(shù)?
- 對(duì)于復(fù)合型任務(wù),我們可以綜合考慮 CPU 密集型和 I/O 密集型的特點(diǎn)進(jìn)行設(shè)置。
- 在這種情況下,如果計(jì)算操作需要 5ms(CPU 操作 5ms),而 I/O 操作需要 100ms,那么單核 CPU 的利用率就是 5 / (5+100)。
- 為了達(dá)到理論上的 100% CPU 利用率,需要的線程數(shù)為 1 / (5 / (5 + 100)),即 21 個(gè)線程。
- 然而,現(xiàn)在我們只有 8 個(gè)核可用,所以根據(jù)這個(gè)理論,需要的線程數(shù)是 168 個(gè)(8 核 * 21 線程)。
- 不過(guò),這是一個(gè)理論值,實(shí)際情況可能受其他因素的限制。
如何對(duì)線程池進(jìn)行監(jiān)控?
(1)如果在系統(tǒng)中大量使用線程池,則有必要對(duì)線程池進(jìn)行監(jiān)控,方便在出現(xiàn)問(wèn)題時(shí),可以根據(jù)線程池的使用狀況快速定位問(wèn)題。可以通過(guò)線程池提供的參數(shù)進(jìn)行監(jiān)控,在監(jiān)控線程池的時(shí)候可以使用以下屬性:
- taskCount:線程池需要執(zhí)行的任務(wù)數(shù)量。
- completedTaskCount:線程池在運(yùn)行過(guò)程中已完成的任務(wù)數(shù)量,小于或等于taskCount。
- largestPoolSize:線程池里曾經(jīng)創(chuàng)建過(guò)的最大線程數(shù)量。通過(guò)這個(gè)數(shù)據(jù)可以知道線程池是否曾經(jīng)滿過(guò)。如該數(shù)值等于線程池的最大大小,則表示線程池曾經(jīng)滿過(guò)。
- getPoolSize:線程池的線程數(shù)量。如果線程池不銷毀的話,線程池里的線程不會(huì)自動(dòng)銷毀,所以這個(gè)大小只增不減。
- getActiveCount:獲取活動(dòng)的線程數(shù)。
(2)此外,我們還可以通過(guò)擴(kuò)展線程池進(jìn)行監(jiān)控。可以通過(guò)繼承線程池來(lái)自定義線程池,重寫線程池的 beforeExecute
、afterExecute
和 terminated
方法,也可以在任務(wù)執(zhí)行前、執(zhí)行后和線程池關(guān)閉前執(zhí)行一些代碼來(lái)進(jìn)行監(jiān)控。例如,監(jiān)控任務(wù)的平均執(zhí)行時(shí)間、最大執(zhí)行時(shí)間和最小執(zhí)行時(shí)間等,這幾個(gè)方法在線程池里是空方法。
如何動(dòng)態(tài)地修改線程池的參數(shù)?
ThreadPoolExecutor
類中提供了以下這些方法來(lái)動(dòng)態(tài)修改參數(shù):
線程池使用的注意事項(xiàng)有哪些?線程池中的線程是不是線程越多越好?
(1)在線程池的使用中,有一些注意事項(xiàng)需要考慮:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-470321.html
- 合理配置線程池大小:線程池的大小應(yīng)根據(jù)處理任務(wù)的類型和數(shù)量來(lái)進(jìn)行配置。如果線程池的線程數(shù)量過(guò)少,可能無(wú)法及時(shí)處理所有的任務(wù);而線程數(shù)量過(guò)多,則會(huì)消耗過(guò)多的系統(tǒng)資源。需要根據(jù)實(shí)際情況進(jìn)行調(diào)整,確保線程池的大小可以適應(yīng)任務(wù)負(fù)載。
- 考慮線程的生命周期:線程池中的線程是可以重用的,所以在設(shè)計(jì)任務(wù)時(shí),要考慮到線程的生命周期,避免出現(xiàn)線程不會(huì)自動(dòng)終止的情況。如果線程的任務(wù)執(zhí)行完畢后,沒(méi)有新的任務(wù)可以執(zhí)行,可以通過(guò)合理的配置線程存活時(shí)間、設(shè)置超時(shí)時(shí)間等機(jī)制來(lái)避免空閑線程一直存在。
- 任務(wù)類型和任務(wù)隊(duì)列:根據(jù)任務(wù)的特點(diǎn)選擇適當(dāng)?shù)娜蝿?wù)隊(duì)列類型。例如,對(duì)于耗時(shí)較長(zhǎng)的I/O密集型任務(wù),可以使用無(wú)界隊(duì)列;而對(duì)于CPU密集型任務(wù),可能需要使用有界隊(duì)列來(lái)限制任務(wù)的提交速度,避免系統(tǒng)資源被耗盡。選擇適當(dāng)?shù)娜蝿?wù)隊(duì)列類型可以對(duì)系統(tǒng)性能產(chǎn)生重要影響。
- 異常處理:在任務(wù)執(zhí)行過(guò)程中,可能會(huì)出現(xiàn)異常。要確保及時(shí)捕獲并處理異常,避免線程池因?yàn)楫惓G闆r而終止或發(fā)生其他意外。
- 監(jiān)控和調(diào)優(yōu):及時(shí)監(jiān)控線程池的運(yùn)行狀態(tài)、任務(wù)執(zhí)行情況和系統(tǒng)資源的使用情況,以便及時(shí)調(diào)整線程池的配置參數(shù),提高系統(tǒng)性能和穩(wěn)定性。
(2)關(guān)于線程數(shù)量的問(wèn)題,更多不一定意味著更好。線程數(shù)量的增加會(huì)增加線程間的上下文切換開銷,并且會(huì)占用更多的系統(tǒng)資源。當(dāng)線程數(shù)量超出合理范圍時(shí),可能會(huì)導(dǎo)致系統(tǒng)負(fù)載過(guò)重,性能反而下降。因此,需要根據(jù)實(shí)際需求和系統(tǒng)資源進(jìn)行評(píng)估,并選擇適當(dāng)?shù)木€程數(shù)量。通常情況下,可以依據(jù)CPU核心數(shù)和任務(wù)類型進(jìn)行參考,一般不建議無(wú)限制增加線程數(shù)量。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-470321.html
到了這里,關(guān)于Java并發(fā)編程面試題——線程池的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!