??作者簡介:大家好,我是若明天不見,BAT的Java高級開發(fā)工程師,CSDN博客專家,后端領域優(yōu)質創(chuàng)作者
??系列專欄:多線程及高并發(fā)系列
??其他專欄:微服務框架系列、MySQL系列、Redis系列、Leetcode算法系列、GraphQL系列
??如果感覺博主的文章還不錯的話,請??點贊收藏關注??支持一下博主哦??
?時間是條環(huán)形跑道,萬物終將歸零,亦得以圓全完美
多線程及高并發(fā)系列
- 【多線程及高并發(fā) 一】內存模型及理論基礎
- 【多線程及高并發(fā) 二】線程基礎及線程中斷同步
- 【多線程及高并發(fā) 三】volatile & synchorized 詳解
- 【多線程與高并發(fā) 四】CAS、Unsafe 及 JUC 原子類詳解
- 【多線程及高并發(fā) 五】AQS & ReentranLock 詳解
- 【多線程及高并發(fā) 番外篇】虛擬線程怎么被 synchronized 阻塞了?
在 Java 并發(fā)編程中,BlockingQueue
、Future
、FutureTask
和ThreadPoolExecutor
是相互關聯的重要概念和組件
- BlockingQueue:是一個支持線程安全的、阻塞操作的隊列。提供了線程間的數據傳遞機制
- Future:是一個接口,表示一個異步計算的結果。提供了異步任務的結果獲取機制
-
FutureTask: 是
Future
的實現類,同時也是一個可執(zhí)行的任務 -
ThreadPoolExecutor:線程池是一個線程管理的工具,用于管理和復用線程資源。管理和調度任務的執(zhí)行,將任務封裝成
FutureTask
并通過BlockingQueue
進行交互
BlockingQueue
BlockingQueue
是一個支持線程安全的、阻塞操作的隊列,它的實現類都有這兩個特性,在后文介紹時就不詳細介紹了。常見的實現類有ArrayBlockingQueue
、LinkedBlockingQueue
、PriorityBlockingQueue
等。BlockingQueue
在并發(fā)編程中廣泛應用于實現生產者-消費者模式,其中生產者將數據放入隊列,消費者從隊列中取出數據進行處理
BlockingQueue 的阻塞操作(如 put() 和 take())可以確保生產者和消費者之間的同步,避免了線程之間的競爭條件
BlockingQueue
類型:
-
ArrayBlockingQueue
:由數組結構組成的有界阻塞隊列 -
LinkedBlockingQueue
:由鏈表結構組成的有界阻塞隊列 -
PriorityBlockingQueue
:支持優(yōu)先級排序的無界阻塞隊列 -
DealyQueue
:使用優(yōu)先級隊列實現的無界阻塞隊列 -
SynchronousQueue
:不存儲元素的阻塞隊列 -
LinkedTransferQueue
:由鏈表結構組成的無界阻塞隊列 -
LinkedBlockingDeque
:由鏈表結構組成的雙向阻塞隊列
ConcurrentLinkedQueue
是一個線程安全的無界隊列實現,隊列按照FIFO
原則對元素進行排序。使用 鏈表數據結構 和 CAS 操作來實現高并發(fā)的插入和提取操作
BlockingQueue 具有 4 組不同的方法用于插入、移除以及對隊列中的元素進行檢查。如果請求的操作不能得到立即執(zhí)行的話,每個方法的表現也不同。這些方法如下:
拋異常 | 特定值 | 阻塞 | 超時 |
---|---|---|---|
插入 | add(o) | offer(o) | put(o) |
移除 | remove() | poll() | take() |
檢查 | element() | peek() |
BlockingQueue & BlockingDeque 對比
BlockingQueue
和BlockingDeque
是Java中用于多線程編程的接口,它們都提供了阻塞操作的功能,但在使用方式和特性上有一些異同。
相同點:
- 都是用于在多線程環(huán)境下進行安全的數據交換的接口
- 都提供了阻塞操作,即在隊列為空時,獲取元素的操作會被阻塞,直到隊列中有元素可用;在隊列已滿時,插入元素的操作會被阻塞,直到隊列有空閑位置
不同點:
-
數據結構差異:
-
BlockingQueue
是一種隊列,它按照先進先出(FIFO)的順序處理元素 -
BlockingDeque
是一種雙端隊列,它允許在隊列的兩端進行插入和提取操作
-
-
操作的位置差異:
-
BlockingQueue
的操作只涉及到隊列的一端,即插入和提取操作只發(fā)生在隊列的一端 -
BlockingDeque
的操作可以在隊列的兩端進行,可以在隊列的頭部和尾部進行插入和提取操作
-
BlockingDeque
為雙端隊列,因此可以根據使用方式模擬堆或棧的特性
BlockingDeque
模擬棧的使用示例:
public class BlockingDequeStackExample {
private BlockingDeque<Integer> stack;
public BlockingDequeStackExample() {
// 創(chuàng)建一個雙端阻塞隊列作為棧的實現
stack = new LinkedBlockingDeque<>();
}
public void push(int element) {
// 在隊列的頭部插入元素,模擬入棧操作。同addFirst方法
stack.push(element);
System.out.println("Pushed element: " + element);
}
public int pop() {
// 從隊列的頭部提取元素,模擬出棧操作。同removeFirst方法
int element = stack.pop();
System.out.println("Popped element: " + element);
return element;
}
public static void main(String[] args) {
BlockingDequeStackExample stackExample = new BlockingDequeStackExample();
// 模擬入棧和出棧操作
stackExample.push(1);
stackExample.push(2);
stackExample.push(3);
stackExample.pop();
stackExample.pop();
stackExample.pop();
}
}
ArrayBlockingQueue & LinkedBlockingQueue 對比
ArrayBlockingQueue
和LinkedBlockingQueue
都是Java中的阻塞隊列,一個是數組結構,一個是鏈表結構
異同點如下:
-
實現方式:
-
ArrayBlockingQueue
基于數組實現,內部使用ReentrantLock
來保證線程安全 -
LinkedBlockingQueue
基于鏈表實現,內部使用兩個鎖(一個用于生產者,一個用于消費者)來保證線程安全
-
-
長度限制:
-
ArrayBlockingQueue
在創(chuàng)建時需要指定一個固定的容量,即隊列的長度是固定的,不能動態(tài)改變 -
LinkedBlockingQueue
可以選擇在創(chuàng)建時指定一個可選的固定容量,如果未指定,則默認為 Integer.MAX_VALUE,即隊列長度可以無限擴展
-
-
內存消耗:
-
ArrayBlockingQueue
使用數組作為底層數據結構,因此在創(chuàng)建時需要預分配固定大小的內存空間,即使隊列中只有少量元素,也會占用整個數組的空間 -
LinkedBlockingQueue
使用鏈表作為底層數據結構,內存空間按需分配,只會占用實際元素所需的內存空間
-
-
公平性:
-
ArrayBlockingQueue
和LinkedBlockingQueue
都支持公平性設置。公平性表示線程是否按照它們加入隊列的順序來獲取元素。當設置為公平模式時,線程將按照先進先出的順序獲取元素,但會對性能產生一定影響。默認情況下,ArrayBlockingQueue
和LinkedBlockingQueue
都是非公平, -
ArrayBlockingQueue
可以通過構造函數指定使用公平鎖的ReentranLock
-
-
性能差異:
- 由于內部實現方式不同,
ArrayBlockingQueue
在高并發(fā)環(huán)境下的性能通常優(yōu)于LinkedBlockingQueue
。這是因為ArrayBlockingQueue
使用單鎖來保證線程安全,而LinkedBlockingQueue
使用兩個鎖,增加了一些額外的開銷
- 由于內部實現方式不同,
根據具體的使用場景和需求,可以選擇適合的阻塞隊列實現。最后根據場景,控制變量后分別壓測,選擇最合適的阻塞隊列
PriorityBlockingQueue
PriorityBlockingQueue
是Java中的一個基于優(yōu)先級的無界阻塞隊列。它具有以下特性:
-
按優(yōu)先級排序:
PriorityBlockingQueue
會根據元素的優(yōu)先級進行排序。優(yōu)先級高的元素在隊列中排在前面。元素的優(yōu)先級可以通過元素自身的比較器(Comparator
)或者元素自身的自然順序來確定 -
無界隊列:
PriorityBlockingQueue
沒有容量限制,可以根據需要動態(tài)地添加元素。它不會出現因隊列已滿而阻塞添加操作的情況 - 線程安全
- 阻塞操作
import java.util.concurrent.PriorityBlockingQueue;
public class PriorityBlockingQueueExample {
public static void main(String[] args) {
// 創(chuàng)建一個PriorityBlockingQueue實例
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
// 添加元素到隊列中
queue.offer(5);
queue.offer(3);
queue.offer(1);
queue.offer(4);
queue.offer(2);
// 提取并打印隊列中的元素 12345
while (!queue.isEmpty()) {
int element = queue.poll();
System.out.println("Polled element: " + element);
}
}
}
在實際應用中,PriorityBlockingQueue可用于實現任務調度、優(yōu)先級隊列等場景,其中需要按照優(yōu)先級處理元素
DelayQueue
DelayQueue是Java中的一個基于延遲時間的阻塞隊列。它具有以下特性:
-
延遲處理:
DelayQueue
中的元素必須實現Delayed
接口。Delayed
接口定義了一個getDelay(TimeUnit unit)
方法,用于獲取元素的剩余延遲時間。只有當延遲時間小于等于零時,元素才可以從隊列中提取 -
按延遲時間排序:
DelayQueue
根據元素的延遲時間進行排序。延遲時間越短的元素在隊列中排在前面 -
無界隊列:
DelayQueue
沒有容量限制,可以根據需要動態(tài)地添加元素。它不會出現因隊列已滿而阻塞添加操作的情況 - 線程安全
- 阻塞操作
在下述示例中,我們創(chuàng)建了一個DelayQueue
實例,并添加了一些延遲元素。延遲元素的延遲時間通過構造函數指定,并在getDelay
方法中計算剩余延遲時間
public class DelayQueueExample {
static class DelayedElement implements Delayed {
private String value;
private long endTime;
public DelayedElement(String value, long delayMs) {
this.value = value;
this.endTime = System.currentTimeMillis() + delayMs;
}
@Override
public long getDelay(TimeUnit unit) {
long remainingTime = endTime - System.currentTimeMillis();
return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
long diff = this.getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS);
return Long.compare(diff, 0);
}
@Override
public String toString() {
return "DelayedElement{" +
"value='" + value + '\'' +
", endTime=" + endTime +
'}';
}
}
public static void main(String[] args) {
// 創(chuàng)建一個DelayQueue實例
DelayQueue<DelayedElement> queue = new DelayQueue<>();
// 添加延遲元素到隊列中
queue.offer(new DelayedElement("Element 1", 2000));
queue.offer(new DelayedElement("Element 2", 5000));
queue.offer(new DelayedElement("Element 3", 3000));
// 提取并打印延遲元素
while (!queue.isEmpty()) {
try {
DelayedElement element = queue.take();
System.out.println("Polled element: " + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
使用take
方法從隊列中提取延遲元素,并將其打印出來。由于DelayQueue
是一個阻塞隊列,當隊列為空時,提取操作會被阻塞,直到有元素的延遲時間到期
輸出結果示例
Polled element: DelayedElement{value='Element 1', endTime=1641418006091}
Polled element: DelayedElement{value='Element 3', endTime=1641418009091}
Polled element: DelayedElement{value='Element 2', endTime=1641418013091}
在實際應用中,DelayQueue可用于實現定時任務、緩存過期等場景,其中需要根據延遲時間對元素進行排序和處理
Future
Future
用于異步結果計算。它提供了一些方法來檢查計算是否完成,使用get
方法將阻塞線程直到結果返回
-
cancel
:嘗試取消任務的執(zhí)行,如果任務已完成或已取消,此操作無效 -
isCancelled
:任務是否已取消 -
isDone
:任務是否已完成 -
get
:阻塞線程以獲取計算結果,直至任務執(zhí)行完畢返回結果 -
get(long timeout, TimeUnit unit)
:阻塞線程以獲取計算結果,若在指定時間沒返回結果,則返回null
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future
結合線程池的使用
public void futureTest(){
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> nickFuture = executorService.submit(() -> userService.getNick());
Future<String> nameFuture = executorService.submit(() -> userService.getUserName());
// 阻塞開始,等待結果
String nick = nickFuture.get(1000, TimeUnit.MILLISECONDS);
String name = nameFuture.get();
}
FutureTask
FutureTask
是 Java 中的一個類,實現了Future
接口,同時也可以用作可執(zhí)行的任務,特別適用于需要異步執(zhí)行任務并獲取結果的場景。FutureTask
常用來封裝Callable
和Runnable
,將任務提交給線程池執(zhí)行,并通過FutureTask
獲取任務的執(zhí)行結果。同時,FutureTask
也提供了一些方法來管理和控制任務的執(zhí)行狀態(tài)、取消任務的執(zhí)行,并處理任務執(zhí)行過程中的異常
FutureTask
是使用CAS操作來實現對任務狀態(tài)的并發(fā)操作的。CAS機制保證了對任務狀態(tài)的更新操作是原子性的,避免了競態(tài)條件和數據不一致的問題
FutureTask
主要包括以下幾種狀態(tài):
- NEW:任務的初始狀態(tài),表示任務尚未執(zhí)行
- COMPLETING:表示任務正在執(zhí)行完成的過程中,但結果尚未設置完畢
- NORMAL:任務執(zhí)行成功完成
- EXCEPTIONAL:任務執(zhí)行過程中發(fā)生了異常
- CANCELLED:任務被取消
- INTERRUPTING:任務正在被中斷的過程中
- INTERRUPTED:任務被中斷
ThreadPoolExecutor 線程池
ThreadPoolExecutor 是 Java 中 Executor 框架提供的一個線程池實現類。它提供了一種方便的方式來管理和復用線程,并執(zhí)行提交的任務。線程池在內部實際上構建了一個生產者消費者模型,將線程和任務兩者解耦,并不直接關聯,從而良好的緩沖任務,復用線程
常用默認實現:
-
Executors#newCachedThreadPool
:無邊界線程池,帶有自動線程回收 -
Executors#newFixedThreadPool
:固定大小的線程池 -
Executors#newSingleThreadExecutor
:單個后臺線程,大多數場景用于預初始化配置
有需要執(zhí)行的任務進入線程池時
- 當前線程數小于核心線程數時,創(chuàng)建線程。
- 當前線程數大于等于核心線程數,且工作隊列未滿時,將任務放入工作隊列。
- 當前線程數大于等于核心線程數,且工作隊列已滿
- 若線程數小于最大線程數,創(chuàng)建線程
- 若線程數等于最大線程數,拋出異常,拒絕任務(具體處理方式取決于
handler
的策略)
當ThreadPoolExecutor
執(zhí)行execute
方法時,當前worker數小于corePoolSize
,會調用addWorker
方法,而workers.add(w)
是在ReentranLock全局鎖里執(zhí)行的,可能會導致以下問題:
-
阻塞其他線程:在
ReentrantLock
的鎖范圍內執(zhí)行workers.add(w)
操作,那么其他線程在嘗試獲取該鎖時將被阻塞,直到當前線程釋放鎖。這可能會導致其他線程在等待期間出現延遲或阻塞 -
性能下降:它們將按順序等待
ReentrantLock
的釋放。這可能導致線程競爭和延遲,從而降低整體性能
預熱線程池是一種優(yōu)化方法,可以在系統(tǒng)啟動時提前創(chuàng)建一定數量的線程,以減少在系統(tǒng)運行時動態(tài)創(chuàng)建線程的開銷
配置參數
-
corePoolSize
核心線程數。空閑時仍會保留在池中的線程數,除非設置了allowCoreThreadTimeOut
參數 -
maximumPoolSize
最大線程數。允許在池中的最大線程數 -
keepAliveTime
存活時間。當前線程數大于核心線程數時,空余線程的最長存活時間 -
unit
單位。keepAliveTime
參數的時間單位 -
workQueue
工作隊列,接口類為阻塞隊列。任務執(zhí)行前存儲的隊列,只有通過submit
方法提交的任務才會進入隊列 -
threadFactory
線程工廠。創(chuàng)建線程。默認使用Executors.defaultThreadFactory()
,所有的線程都屬于同一個ThreadGroup
,都有相同的優(yōu)先級,且均不是守護線程。
(可用new NamedThreadFactory("test")
來對線程池中的線程添加前綴標識) -
handler
任務丟棄策略。若線程池已經關閉、或線程池已滿,那么新的任務會被拒絕。-
ThreadPoolExecutor.AbortPolicy
:丟棄任務并拋出RejectedExecutionException
異常 -
ThreadPoolExecutor.DiscardPolicy
:丟棄任務,但不拋出異常。 -
ThreadPoolExecutor.DiscardOldestPolicy
:丟棄隊列最前面的任務,然后重新嘗試執(zhí)行任務(循環(huán)此過程) -
ThreadPoolExecutor.CallerRunsPolicy
:由調用線程處理該任務
-
合理配置線程池
-
線程池必須手動通過
ThreadPoolExecutor
的構造函數來聲明,避免使用Executors
類創(chuàng)建線程池,否則會因為使用了無界隊列或任務隊列最大長度為 Integer.MAX_VALUE,導致堆積大量的請求 會有 OOM 風險 -
推薦使用有界隊列,可以有效地控制線程池占用的內存和其他資源的數量,且
maximumPoolSize
配置能排上用場
如果線程池的工作隊列已滿,但是線程池的線程數還沒有達到maximumPoolSize
,那么線程池會創(chuàng)建新的非核心線程來處理這些任務,以避免任務積壓和系統(tǒng)性能下降。
- corePoolSize 配置
-
CPU 密集型任務(N+1): 這種任務消耗的主要是 CPU 資源,可以將線程數設置為 N(CPU 核心數)+1。比 CPU 核心數多出來的一個線程是為了防止線程偶發(fā)的缺頁中斷,或者其它原因導致的任務暫停而帶來的影響。一旦任務暫停,CPU 就會處于空閑狀態(tài),而在這種情況下多出來的一個線程就可以充分利用 CPU 的空閑時間。
-
I/O 密集型任務(2N): 這種任務應用起來,系統(tǒng)會用大部分的時間來處理 I/O 交互,而線程在處理 I/O 的時間段內不會占用 CPU 來處理,這時就可以將 CPU 交出給其它線程使用
線程池監(jiān)控
可以利用ThreadPoolExecutor
的相關API
做一個基礎的監(jiān)控。從下圖可以看出,ThreadPoolExecutor
提供了獲取線程池當前的線程數和活躍線程數、已經執(zhí)行完成的任務數、正在排隊中的任務數等等
也可以使用 SpringBoot 中的 Actuator 組件或 有監(jiān)控功能的開源動態(tài)線程池Dynamic TP
動態(tài)化線程池
通過ThreadPoolExecutor
提供的 public 方法可以動態(tài)修改參數配置
注意的是程序運行期間的時候,我們調用
setCorePoolSize()
這個方法的話,線程池會首先判斷當前工作線程數是否大于corePoolSize
,如果大于的話就會回收工作線程
更多動態(tài)修改線程池參數的功能,可以使用開源軟件:
- Hippo4jopen:異步線程池框架,支持線程池動態(tài)變更&監(jiān)控&報警,無需修改代碼輕松引入。支持多種使用模式,輕松引入,致力于提高系統(tǒng)運行保障能力
- Dynamic TP:輕量級動態(tài)線程池,內置監(jiān)控告警功能,集成三方中間件線程池管理,基于主流配置中心(已支持 Nacos、Apollo,Zookeeper、Consul、Etcd,可通過 SPI 自定義實現)
@Async 自定義線程池
在@Async
注解在使用時,不指定線程池的名稱,默認SimpleAsyncTaskExecutor
線程池。
默認的線程池配置為核心線程數為8,等待隊列為無界隊列,即當所有核心線程都在執(zhí)行任務時,后面的任務會進入隊列等待,若邏輯執(zhí)行速度較慢會導致線程池阻塞,從而出現監(jiān)聽器拋棄和無響應的結果
spring默認線程池配置參數
org.springframework.boot.autoconfigure.task.TaskExecutionProperties
/**
* Configuration properties for task execution.
*
* @author Stephane Nicoll
* @since 2.1.0
*/
@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {
private final Pool pool = new Pool();
/**
* Prefix to use for the names of newly created threads.
*/
private String threadNamePrefix = "task-";
public static class Pool {
/**
* Queue capacity. An unbounded capacity does not increase the pool and therefore
* ignores the "max-size" property.
*/
private int queueCapacity = Integer.MAX_VALUE;
/**
* Core number of threads.
*/
private int coreSize = 8;
/**
* Maximum allowed number of threads. If tasks are filling up the queue, the pool
* can expand up to that size to accommodate the load. Ignored if the queue is
* unbounded.
*/
private int maxSize = Integer.MAX_VALUE;
/**
* Whether core threads are allowed to time out. This enables dynamic growing and
* shrinking of the pool.
*/
private boolean allowCoreThreadTimeout = true;
/**
* Time limit for which threads may remain idle before being terminated.
*/
private Duration keepAlive = Duration.ofSeconds(60);
//getter/setter
}
}
線程池和 ThreadLocal 共用的坑
線程池和 ThreadLocal共用,可能會導致線程從ThreadLocal獲取到的是舊值/臟數據。這是因為線程池會復用線程對象,與線程對象綁定的類的靜態(tài)屬性 ThreadLocal 變量也會被重用,這就導致一個線程可能獲取到其他線程的ThreadLocal 值
阿里開源的TransmittableThreadLocal(TTL)能解決線程池中ThreadLocal的問題。
TransmittableThreadLocal
類繼承并加強了 JDK 內置的InheritableThreadLocal
類,在使用線程池等會池化復用線程的執(zhí)行組件情況下,提供ThreadLocal值的傳遞功能,解決異步執(zhí)行時上下文傳遞的問題文章來源:http://www.zghlxwxcb.cn/news/detail-790362.html
參考資料:文章來源地址http://www.zghlxwxcb.cn/news/detail-790362.html
- Java線程池實現原理及其在美團業(yè)務中的實踐
- Java 線程池最佳實踐
- Java 線程池作用及類型
- Java 并發(fā)編程 Future及CompletionService
- TransmittableThreadLocal(TTL)
- 案例分析|線程池相關故障梳理&總結
到了這里,關于【多線程及高并發(fā) 六】并發(fā)集合及線程池詳解的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!