項目源碼
真誠的希望能給我項目一個stars?。?!
項目源碼
項目視頻演示
線程池概念
線程池(Thread Pool)是一種基于池化思想管理線程的工具,經(jīng)常出現(xiàn)在多線程服務器中,如Tomcat。
線程過多會帶來額外的開銷,其中包括創(chuàng)建銷毀線程的開銷、調(diào)度線程的開銷等等,同時也降低了計算機的整體性能。線程池維護多個線程,等待監(jiān)督管理者分配可并發(fā)執(zhí)行的任務。這種做法,一方面避免了處理任務時創(chuàng)建銷毀線程開銷的代價,另一方面避免了線程數(shù)量膨脹導致的過分調(diào)度問題,保證了對內(nèi)核的充分利用。
線程池解決的核心問題就是資源管理問題。在并發(fā)環(huán)境下,系統(tǒng)不能夠確定在任意時刻中,有多少任務需要執(zhí)行,有多少資源需要投入。這種不確定性將帶來以下若干問題:
頻繁申請/銷毀資源和調(diào)度資源,將帶來額外的消耗,可能會非常巨大。
對資源無限申請缺少抑制手段,易引發(fā)系統(tǒng)資源耗盡的風險。
系統(tǒng)無法合理管理內(nèi)部的資源分布,會降低系統(tǒng)的穩(wěn)定性。
為解決資源分配這個問題,線程池采用了“池化”(Pooling)思想。池化,顧名思義,是為了最大化收益并最小化風險,而將資源統(tǒng)一在一起管理的一種思想。
使用線程池的好處如下:
降低資源消耗:通過池化技術(shù)重復利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的損耗。
提高響應速度:任務到達時,無需等待線程創(chuàng)建即可立即執(zhí)行。
提高線程的可管理性:線程是稀缺資源,如果無限制創(chuàng)建,不僅會消耗系統(tǒng)資源,還會因為線程的不合理分布導致資源調(diào)度失衡,降低系統(tǒng)的穩(wěn)定性。使用線程池可以進行統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控。
提供更多更強大的功能:線程池具備可拓展性,允許開發(fā)人員向其中增加更多的功能。比如延時定時線程池ScheduledThreadPoolExecutor,就允許任務延期執(zhí)行或定期執(zhí)行。
過多的概念以及線程池中的核心參數(shù)這些問題我就不講解了,可以直接看下面這篇文章
線程池概念以及核心參數(shù)講解
ThreadPoolExecutor介紹
先看一下ThreadPoolExecutor的類圖。
ThreadPoolExecutor實現(xiàn)的頂層接口是Executor,頂層接口Executor提供了一種思想:將任務提交和任務執(zhí)行進行解耦。用戶無需關注如何創(chuàng)建線程,如何調(diào)度線程來執(zhí)行任務,用戶只需提供Runnable對象,將任務的運行邏輯提交到執(zhí)行器(Executor)中,由Executor框架完成線程的調(diào)配和任務的執(zhí)行部分。ExecutorService接口增加了一些能力:(1)擴充執(zhí)行任務的能力,補充可以為一個或一批異步任務生成Future的方法;(2)提供了管控線程池的方法,比如停止線程池的運行。AbstractExecutorService則是上層的抽象類,將執(zhí)行任務的流程串聯(lián)了起來,保證下層的實現(xiàn)只需關注一個執(zhí)行任務的方法即可。最下層的實現(xiàn)類ThreadPoolExecutor實現(xiàn)最復雜的運行部分,ThreadPoolExecutor將會一方面維護自身的生命周期,另一方面同時管理線程和任務,使兩者良好的結(jié)合從而執(zhí)行并行任務。
ThreadPoolExecutor的執(zhí)行機制如下圖。
(來源網(wǎng)絡)
上圖表明了當一個任務過來的時候,三種處理情況,直接拒絕,直接執(zhí)行,或者是放入緩沖區(qū)。
線程池的本質(zhì)是對任務和線程的管理,而做到這一點最關鍵的思想就是將任務和線程兩者解耦,不讓兩者直接關聯(lián),才可以做后續(xù)的分配工作。**線程池中是以生產(chǎn)者消費者模式,通過一個阻塞隊列來實現(xiàn)的。**阻塞隊列緩存任務,工作線程從阻塞隊列中獲取任務。
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强?。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用于生產(chǎn)者和消費者的場景,生產(chǎn)者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產(chǎn)者存放元素的容器,而消費者也只從容器里拿元素。
上文中也提到了幾個BlockingQueue了,這里不做贅述。
這里也不在講解線程池的任務申請流程了。
Nacos
我們知道,使用Nacos作為注冊中心和配置中心的好處在于,如果我們的配置修改了,我們的項目是可以實時監(jiān)控到的并且進行配置的同步更新。
我們可以按照如上的方式來配置線程池的參數(shù),這是完全沒問題的。
我們也知道,我們把這里的參數(shù)修改之后,我們的代碼里面也會同步的修改這些值為新值。
但是有一個問題,我們知道,我們創(chuàng)建線程池的時候都是new一個線程池出來,那么這里有一個問題就在于,我們使用的是spring項目,我們不會去說當nacos的配置文件修改之后,然后去通知項目然后new一個線程池出來,而是希望在原有的線程池的參數(shù)上進行修改。
恰巧,ThreadPoolExecutor也提供了這些方法給我們來讓我們運行時修改線程池的參數(shù)。
那么現(xiàn)在問題就變成了,我們?nèi)绾巫龅絥acos的配置更新之后,我們?nèi)绾螌σ呀?jīng)創(chuàng)建的線程池進行參數(shù)的修改呢?
肯定能最快想到的就是事件通知機制了,我們猜測nacos能實現(xiàn)本地代碼配置的實時更新,大概率也是使用了通知功能,所以我們翻看nacos的源碼可以發(fā)現(xiàn)nacos有一個方法可以添加一個監(jiān)聽器來監(jiān)聽配置文件的更新。
實現(xiàn)對Nacos配置文件更新的事件監(jiān)聽機制
那么接下來的代碼來實現(xiàn)Nacos的事件監(jiān)聽機制
spring:
application:
name: dynamic-thread-pool
profiles:
active: dev
cloud:
nacos:
discovery:
namespace: test
group: DYNAMIC_THREADPOOL
server-addr: xxxx # 填寫nacos地址
config:
server-addr: xxxx # 填寫nacos地址
group: DYNAMIC_THREADPOOL
namespace: test
file-extension: properties
shared-configs:
- data-id: dynamic-thread-pool-dev.properties
refresh: true
server:
port: 8080
然后我們編寫一個配置類,當然,這個配置類其實可有可無
package zhang.blossom.dynamic.threadpool.config;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import java.util.Properties;
/**
* nacos配置
*
* @author 張錦標
*/
@Slf4j
@Configuration
public class NacosConfig {
@Value("${spring.cloud.nacos.config.server-addr}")
private String serverAddr;
@Value("${spring.cloud.nacos.config.namespace}")
private String namespace;
@Bean
@Primary
public ConfigService configService() {
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
properties.put("namespace", namespace);
try {
return NacosFactory.createConfigService(properties);
} catch (NacosException e) {
log.error(e.toString(), e);
}
return null;
}
}
之后我們創(chuàng)建一個監(jiān)聽器,有多種方法,我們直接@Bean創(chuàng)建一個我們配置好的ConfigService也可以,當然,我這里選擇的是實現(xiàn)ApplicationRunner接口,這樣子可以做到項目啟動成功后會自動執(zhí)行run方法。
代碼如下
package zhang.blossom.dynamic.threadpool.listener;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import zhang.blossom.dynamic.threadpool.core.ResizableCapacityLinkedBlockIngQueue;
import javax.annotation.Resource;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 項目啟動后添加監(jiān)聽,實現(xiàn)ApplicationRunner,
* 在項目成功后會自動執(zhí)行run方法
*
* @author 張錦標
* @version 1.0
*/
@Slf4j
@Component
public class NacosListener implements ApplicationRunner {
@Resource
private ConfigService configService;
@Value("${spring.cloud.nacos.config.group}")
private String groupId;
public static final String DATA_ID = "dynamic-thread-pool-dev.properties";
@Autowired
@Qualifier("commonThreadPool")
private ThreadPoolExecutor threadPoolExecutor;
@Override
public void run(ApplicationArguments args) throws Exception {
//添加nacos配置文件監(jiān)聽
listenerNacosConfig();
}
/**
* 監(jiān)聽數(shù)據(jù)源變化
*
* @throws Exception 異常
*/
private void listenerNacosConfig() throws Exception {
configService.addListener(DATA_ID, groupId, new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
//configInfo是一個字符串,它的內(nèi)容就是你配置文件里所有的內(nèi)容
//這里推薦配置文件使用properties方式,不然后續(xù)不好處理
}
@Override
public Executor getExecutor() {
return null;
}
});
}
/**
* 向nacos發(fā)布內(nèi)容
* 會直接覆寫原本的配置文件,請謹慎使用
* @param content 內(nèi)容
* @throws Exception 異常
*/
private void publishConfig(String content) throws Exception {
//發(fā)布內(nèi)容
configService.publishConfig(DATA_ID, groupId, content);
}
}
ok,然后我現(xiàn)在創(chuàng)建一個配置類,來讀取nacos上面對應的參數(shù)。
package zhang.blossom.dynamic.threadpool.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
/**
* @author: 張錦標
* @date: 2023/6/15 11:25
* ThreadPoolProperty類
*/
//@Configuration
@RefreshScope
@Component
@ConfigurationProperties("dynamic.threadpool")
public class ThreadPoolProperty {
//@Value("${dynamic.threadpool.corePoolSize}")
private Integer corePoolSize;
//@Value("${dynamic.threadpool.maximumPoolSize}")
private Integer maximumPoolSize;
//@Value("${dynamic.threadpool.queueCapacity}")
private Integer queueCapacity;
public Integer getCorePoolSize() {
return corePoolSize;
}
public void setCorePoolSize(Integer corePoolSize) {
this.corePoolSize = corePoolSize;
}
public Integer getMaximumPoolSize() {
return maximumPoolSize;
}
public void setMaximumPoolSize(Integer maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
}
public Integer getQueueCapacity() {
return queueCapacity;
}
public void setQueueCapacity(Integer queueCapacity) {
this.queueCapacity = queueCapacity;
}
}
之后, 啟動項目就可以得到我們在nacos上配置的參數(shù)的值了。
下面是一個測試:
package zhang.blossom.dynamic.threadpool.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import zhang.blossom.dynamic.threadpool.config.ThreadPoolProperty;
/**
* @author: 張錦標
* @date: 2023/6/15 11:59
* TestController類
*/
@RestController
public class TestController {
@Autowired
private ThreadPoolProperty threadPoolProperty;
@GetMapping("/get")
public String getValue(){
return threadPoolProperty.getMaximumPoolSize() +" "+
threadPoolProperty.getCorePoolSize()+" "+
threadPoolProperty.getQueueCapacity();
}
}
定時通知功能
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.1</version>
</dependency>
使用xxl-job來實現(xiàn)定時通知功能文章來源:http://www.zghlxwxcb.cn/news/detail-491385.html
package zhang.blossom.dynamic.threadpool.handler;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import zhang.blossom.dynamic.threadpool.service.SendMailService;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author 張錦標
* @version 1.0
*/
@Component
public class SimpleXxxJob {
@Autowired
private SendMailService sendMailService;
@Autowired
@Qualifier("commonThreadPool")
private ThreadPoolExecutor threadPoolExecutor;
//使用XXLJOB注解定義一個job
@XxlJob(value = "sendMailHandler", init = "initHandler", destroy = "destroyHandler")
public void sendMailHandler() {
sendMailService.sendMailToWarn(threadPoolExecutor);
System.out.println("發(fā)送短信成功。。。。。。。。。。。");
}
//任務初始化方法
public void initHandler() {
System.out.println("任務調(diào)用初始化方法執(zhí)行");
}
public void destroyHandler() {
System.out.println("任務執(zhí)行器被銷毀");
}
}
郵件發(fā)送通知功能
package zhang.blossom.dynamic.threadpool.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Service;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author: 張錦標
* @date: 2023/6/15 15:24
* SendMailToWarn類
*/
@Service
public class SendMailService{
@Autowired
private JavaMailSender javaMailSender;
@Value("${warn.recipient}")
public String recipient;
@Value("${warn.addresser}")
public String addresser;
public boolean sendMailToWarn(ThreadPoolExecutor threadPoolExecutor){
SimpleMailMessage simpleMailMessage = new SimpleMailMessage();
simpleMailMessage.setTo(recipient);
simpleMailMessage.setFrom(addresser);
simpleMailMessage.setSubject("線程池情況匯報");
String s = "CorePoolSize="+threadPoolExecutor.getCorePoolSize()+" "+
"LargestPoolSize="+threadPoolExecutor.getLargestPoolSize()+" "+
"MaximumPoolSize="+threadPoolExecutor.getMaximumPoolSize();
simpleMailMessage.setText(s);
javaMailSender.send(simpleMailMessage);
return true;
}
}
開始測試
這里特別需要先強調(diào)一個點,JDK提供的線程池雖然有獲得工作隊列的方法,但是目前目前實現(xiàn)的消息隊列都是不能動態(tài)修改容量的,所以你需要自己實現(xiàn)一個工作隊列。
具體可以看我項目中的代碼實現(xiàn)。
(動動你發(fā)財?shù)男∈纸o我的項目點一個stars?。。。?br>源碼地址—希望能給我一個stars作為鼓勵?。。?br> 這是項目一開始啟動的時候的線程池的參數(shù)情況。
ok,然后我們修改nacos的配置文件,使其發(fā)生監(jiān)聽事件
然后,我們的xxl-job也會定時的給我們進行消息提示
到此為止,整個項目完成。文章來源地址http://www.zghlxwxcb.cn/news/detail-491385.html
到了這里,關于【Java項目】使用Nacos實現(xiàn)動態(tài)線程池技術(shù)以及Nacos配置文件更新監(jiān)聽事件的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!