1、前言
??在一個(gè)向第三方平臺(tái)推送消息的場(chǎng)景中,為了提高程序的執(zhí)行效率,每次發(fā)送消息,都創(chuàng)建一個(gè)新的線程來(lái)完成發(fā)送消息的任務(wù),為了提供線程的使用性能,我選擇了ThreadPoolTaskExecutor線程池,結(jié)果在使用的過(guò)程中,出現(xiàn)了較多的問(wèn)題,這里記錄一下避免以后再出現(xiàn)類似的錯(cuò)誤(這些錯(cuò)誤是不應(yīng)該出現(xiàn)的,還是對(duì)ThreadPoolTaskExecutor使用不熟悉造成的)。
2、ThreadPoolTaskExecutor用法簡(jiǎn)介
??ThreadPoolTaskExecutor是Spring Framework提供的一個(gè)線程池實(shí)現(xiàn),它繼承自ThreadPoolExecutor類,并實(shí)現(xiàn)了AsyncTaskExecutor和SchedulingTaskExecutor接口,可以用于異步任務(wù)執(zhí)行和定時(shí)任務(wù)調(diào)度。
??我們通過(guò)配置類定義了一個(gè)ThreadPoolTaskExecutor的Bean,并設(shè)置了核心線程數(shù)、最大線程數(shù)、隊(duì)列容量和線程名稱前綴等參數(shù)。然后在MyService類中注入了這個(gè)線程池,并在executeTask方法中使用taskExecutor.execute()來(lái)提交一個(gè)異步任務(wù)。
??通過(guò)這種方式,我們可以方便地使用ThreadPoolTaskExecutor來(lái)執(zhí)行異步任務(wù),實(shí)現(xiàn)多線程處理和任務(wù)調(diào)度的功能。
??以下是ThreadPoolTaskExecutor的基本用法示例:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class ExecutorConfig {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(10);
executor.setThreadNamePrefix("MyThread-");
executor.initialize();
return executor;
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
@Component
public class MyService {
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
public void executeTask() {
taskExecutor.execute(() -> {
// 執(zhí)行異步任務(wù)邏輯
// ...
});
}
}
3、ThreadPoolTaskExecutor使用過(guò)程復(fù)現(xiàn)
3.1、最開始的代碼——線程泄露
??以下代碼是最開始的一版代碼,這里出現(xiàn)了一個(gè)非常嚴(yán)重的錯(cuò)誤,直接造成了線程泄露,或者說(shuō)是完全錯(cuò)用了線程池,反而造成了更多的線程資源浪費(fèi)。
/**
* 消息推送
* 20230609 hsh
*/
public class PushNotificationService {
private static Logger logger = LoggerFactory.getLogger(PushNotificationService.class);
/**
* 線程池
* @return
*/
public static Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
return executor;
}
/**
* 消息推送
*/
public static void pushNotification(EventTask eventTask, Map<String,Object> param) {
Executor executor = getAsyncExecutor();
executor.execute(new Runnable() {
@Override
public void run() {
if(this.isSend(param)){//判斷是否需要推送
String msg = "";
sendMsg(eventInfo);
}
}
private boolean isSend(Map<String, Object> param) {
return true;
}
private String sendMsg(String msg){
return "發(fā)送";
}
});
}
}
??上述代碼的問(wèn)題,主要發(fā)生在獲取線程池的問(wèn)題上,即每次發(fā)送消息都調(diào)用了getAsyncExecutor()方法,本意是獲取一個(gè)線程,結(jié)果這里每次都會(huì)創(chuàng)建一個(gè)線程池,而且線程數(shù)至少會(huì)有10個(gè),所以每次本來(lái)只需要?jiǎng)?chuàng)建一個(gè)線程處理消息發(fā)送,結(jié)果每次都創(chuàng)建了一個(gè)線程池,每個(gè)線程池至少還有10個(gè)線程,結(jié)果就出現(xiàn)了線程泄露問(wèn)題。
3.2、第一次修改——解決線程泄露問(wèn)題,但是線程不安全,在多線程環(huán)境下還是可能會(huì)導(dǎo)致創(chuàng)建多個(gè)線程池實(shí)例
??為了解決最開始出現(xiàn)的線程泄露問(wèn)題,我把ThreadPoolTaskExecutor 作為對(duì)象的一個(gè)變量,每次獲取的時(shí)候,判斷是否為空,如果不為空時(shí),就不再創(chuàng)建了。
/**
* 消息推送
* 20230609 hsh
*/
public class PushNotificationService {
private static Logger logger = LoggerFactory.getLogger(PushNotificationService.class);
private static ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
/**
* 線程池
* @return
*/
public static Executor getAsyncExecutor() {
if(executor == null){
executor = new ThreadPoolTaskExecutor();
}
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
return executor;
}
//省略……
}
??如果多個(gè)線程同時(shí)調(diào)用getAsyncExecutor()方法,還是可能會(huì)導(dǎo)致創(chuàng)建多個(gè)線程池實(shí)例。
3.3、第二次修改——解決線程安全問(wèn)題,帶來(lái)了性能問(wèn)題
??為了解決線程安全問(wèn)題,我直接使用了synchronized 關(guān)鍵字,代碼如下:
/**
* 消息推送
* 20230609 hsh
*/
public class PushNotificationService {
private static Logger logger = LoggerFactory.getLogger(PushNotificationService.class);
private static ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
/**
* 線程池
* @return
*/
public static Executor getAsyncExecutor() {
synchronized (executor){
if(executor == null){
executor = new ThreadPoolTaskExecutor();
}
executor.setCorePoolSize(30);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
return executor;
}
}
//省略……
}
??為了解決線程安全問(wèn)題,我直接使用了synchronized 關(guān)鍵字,上述代碼雖然可以正常運(yùn)行,但是帶來(lái)了非常嚴(yán)重的性能問(wèn)題,因?yàn)閟ynchronized 關(guān)鍵字包含了整個(gè)代碼塊,就相當(dāng)于在getAsyncExecutor() 方法上使用了synchronized 關(guān)鍵字,該方法就變成了單線程執(zhí)行了,所以效率非常低。
3.4、第二次修改——解決性能問(wèn)題,帶來(lái)了初始化異常
??為了解決性能問(wèn)題,使用雙重檢查鎖定(double-checked locking)機(jī)制來(lái)確保線程安全同時(shí)保證處理性能,代碼如下:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-481550.html
/**
* 消息推送
* 20230609 hsh
*/
public class PushNotificationService {
private static Logger logger = LoggerFactory.getLogger(PushNotificationService.class);
private static ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
/**
* 線程池
* @return
*/
public static Executor getAsyncExecutor() {
if (executor == null) {
synchronized (executor) {
if (executor == null) {
executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(30);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
}
}
}
return executor;
}
//省略……
}
??通過(guò)雙重檢查鎖定,確實(shí)可以實(shí)現(xiàn)性能的提升,但是這里忽略了一個(gè)細(xì)節(jié),就是ThreadPoolTaskExecutor 初始化,因?yàn)檫@里當(dāng)executor 對(duì)象不為空時(shí),直接返回了,沒(méi)有進(jìn)行initialize()操作,所以報(bào)了“ThreadPoolTaskExecutor not initialized ”錯(cuò)誤,因此聲明ThreadPoolTaskExecutor executor 對(duì)象的時(shí)候,不能使用new ThreadPoolTaskExecutor()方法進(jìn)行定義,同時(shí)為了避免指令重排序可能帶來(lái)的問(wèn)題,需要將 executor 聲明為 volatile 類型,以確保在多線程環(huán)境下的可見性和正確的初始化順序。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-481550.html
/**
* 消息推送
* 20230609 hsh
*/
public class PushNotificationService {
private static Logger logger = LoggerFactory.getLogger(PushNotificationService.class);
private static volatile ThreadPoolTaskExecutor executor;
/**
* 線程池
* @return
*/
public static Executor getAsyncExecutor() {
if (executor == null) {
synchronized (executor) {
if (executor == null) {
executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(30);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
}
}
}
return executor;
}
//省略……
}
4、總結(jié)
- 知識(shí)點(diǎn)1:ThreadPoolTaskExecutor用法
- 知識(shí)點(diǎn)2:synchronized 關(guān)鍵字
- 知識(shí)點(diǎn)3:雙重檢查鎖定(double-checked locking)機(jī)制
- 知識(shí)點(diǎn)4:volatile 關(guān)鍵字
到了這里,關(guān)于第一次使用ThreadPoolTaskExecutor實(shí)現(xiàn)線程池的經(jīng)歷,反復(fù)修改了多次代碼才正常使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!