背景
我們應對并發(fā)場景時一般會采用下面方式去預估線程池的線程數量,比如QPS需求是1000,平均每個任務需要執(zhí)行的時間是t秒,那么我們需要的線程數是t * 1000。
但是在一些情況下,這個t是不好估算的,即便是估算出來了,在實際的線程環(huán)境上也需要進行驗證和微調。比如在本文所闡述分頁查詢的數據項組合場景中。
1、數據組合依賴不同的上游接接口, 它們的響應時間參差不齊,甚至差距還非常大。有些接口支持批量查詢而另一些則不支持批量查詢。有些接口因為性能問題還需要考慮降級和平滑方案。
2、為了提升用戶體驗,這里的查詢設計了動態(tài)列,因此每一次訪問所需要組合的數據項和數量也是不同的。
因此這里如果需要估算出一個合理的t是不太現實的。
方案
一種可動態(tài)調節(jié)的策略,根據監(jiān)控的反饋對線程池進行微調。整體設計分為裝配邏輯和線程池封裝設計。
1、裝配邏輯
查詢結果,拆分分片(水平拆分),并行裝配(垂直拆分),獲得裝配項列表(動態(tài)列), 并行裝配每一項。
2、線程池封裝
可調節(jié)的核心線程數、最大線程數、線程保持時間,隊列大小,提交任務重試等待時間,提交任務重試次數。 固定異常拒絕策略。
調節(jié)參數:
字段 | 名稱 | 說明 |
---|---|---|
corePoolSize | 核心線程數 | 參考線程池定義 |
maximumPoolSize | 最大線程數 | 參考線程池定義 |
keepAliveTime | 線程存活時間 | 參考線程池定義 |
queueSize | 隊列長度 | 參考線程池定義 |
resubmitSleepMillis | 提交任務重試等待時間 | 添加任務被拒絕后重試時的等待時間 |
resubmitTimes | 提交任務重試次數 | 添加任務被拒絕后重試添加的最大次數 |
@Data
private static class PoolPolicy {
/** 核心線程數 */
private Integer corePoolSize;
/** 最大線程數 */
private Integer maximumPoolSize;
/** 線程存活時間 */
private Integer keepAliveTime;
/** 隊列容量 */
private Integer queueSize;
/** 重試等待時間 */
private Long resubmitSleepMillis;
/** 重試次數 */
private Integer resubmitTimes;
}
創(chuàng)建線程池:
線程池的創(chuàng)建考慮了動態(tài)的需求,滿足根據壓測結果進行微調的要求。首先緩存舊的線程池后再創(chuàng)建新的線程,當新的線程池創(chuàng)建成功后再去關閉舊的線程池。保證在這個替換過程中不影響正在執(zhí)行的業(yè)務。線程池使用了中斷策略,用戶可以及時感知到系統(tǒng)繁忙并保證了系統(tǒng)資源占用的安全。
public void reloadThreadPool(PoolPolicy poolPolicy) {
if (poolPolicy == null) {
throw new RuntimeException("The thread pool policy cannot be empty.");
}
if (poolPolicy.getCorePoolSize() == null) {
poolPolicy.setCorePoolSize(0);
}
if (poolPolicy.getMaximumPoolSize() == null) {
poolPolicy.setMaximumPoolSize(Runtime.getRuntime().availableProcessors() + 1);
}
if (poolPolicy.getKeepAliveTime() == null) {
poolPolicy.setKeepAliveTime(60);
}
if (poolPolicy.getQueueSize() == null) {
poolPolicy.setQueueSize(Runtime.getRuntime().availableProcessors() + 1);
}
if (poolPolicy.getResubmitSleepMillis() == null) {
poolPolicy.setResubmitSleepMillis(200L);
}
if (poolPolicy.getResubmitTimes() == null) {
poolPolicy.setResubmitTimes(5);
}
// - 線程池策略沒有變化直接返回已有線程池。
ExecutorService original = this.executorService;
this.executorService = new ThreadPoolExecutor(
poolPolicy.getCorePoolSize(),
poolPolicy.getMaximumPoolSize(),
poolPolicy.getKeepAliveTime(), TimeUnit.SECONDS,
new ArrayBlockingQueue<>(poolPolicy.getQueueSize()),
new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build(),
new ThreadPoolExecutor.AbortPolicy());
this.poolPolicy = poolPolicy;
if (original != null) {
original.shutdownNow();
}
}
任務提交:
線程池封裝對象中使用的線程池拒絕策略是AbortPolicy,因此在線程數和阻塞隊列到達上限后會觸發(fā)異常。另外在這里為了保證提交的成功率利用重試策略實現了一定程度的延遲處理,具體場景中可以結合業(yè)務特點進行適當的調節(jié)和配置。
public <T> Future<T> submit(Callable<T> task) {
RejectedExecutionException exception = null;
Future<T> future = null;
for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
try {
// - 添加任務
future = this.executorService.submit(task);
exception = null;
break;
} catch (RejectedExecutionException e) {
exception = e;
this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
}
}
if (exception != null) {
throw exception;
}
return future;
}
監(jiān)控:
1、submit提交的監(jiān)控
見代碼中的「監(jiān)控點①」,在submit方法中添加監(jiān)控點,監(jiān)控key的需要添線程池封裝對象的線程名稱前綴,用于區(qū)分具體的線程池對象。
「監(jiān)控點①」用于監(jiān)控添加任務的動作是否正常,以便對線程池對象及策略參數進行微調。
public <T> Future<T> submit(Callable<T> task) {
// - 監(jiān)控點①
CallerInfo callerInfo = Profiler.registerInfo(UmpConstant.THREAD_POOL_WAP + threadNamePrefix,
UmpConstant.APP_NAME,
UmpConstant.UMP_DISABLE_HEART,
UmpConstant.UMP_ENABLE_TP);
RejectedExecutionException exception = null;
Future<T> future = null;
for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
try {
// - 添加任務
future = this.executorService.submit(task);
exception = null;
break;
} catch (RejectedExecutionException e) {
exception = e;
this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
}
}
if (exception != null) {
// - 監(jiān)控點①
Profiler.functionError(callerInfo);
throw exception;
}
// - 監(jiān)控點①
Profiler.registerInfoEnd(callerInfo);
return future;
}
2、線程池并行任務
見代碼的「監(jiān)控點②」,分別在添加任務和任務完成后。
「監(jiān)控點②」實時統(tǒng)計在線程中執(zhí)行的總任務數量,用于評估線程池的任務的數量的滿載水平。
/** 任務并行數量統(tǒng)計 */
private AtomicInteger parallelTaskCount = new AtomicInteger(0);
public <T> Future<T> submit(Callable<T> task) {
RejectedExecutionException exception = null;
Future<T> future = null;
for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
try {
// - 添加任務
future = this.executorService.submit(()-> {
T rst = task.call();
// - 監(jiān)控點②
log.info("{} - Parallel task count {}", this.threadNamePrefix, this.parallelTaskCount.decrementAndGet());
return rst;
});
// - 監(jiān)控點②
log.info("{} + Parallel task count {}", this.threadNamePrefix, this.parallelTaskCount.incrementAndGet());
exception = null;
break;
} catch (RejectedExecutionException e) {
exception = e;
this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
}
}
if (exception != null) {
throw exception;
}
return future;
}
3、調節(jié)
線程池封裝對象策略的調節(jié)時機
1)上線前基于流量預估的壓測階段;
2)上線后跟進監(jiān)控數據和線程池中任務的滿載水平進行人工微調,也可以通過JOB在指定的時間自動調整;
3)大促前依據往期大促峰值來調高相關參數。
線程池封裝對象策略的調節(jié)經驗
1)訪問時長要求較低時,我們可以考慮調小線程數和阻塞隊列,適當調大提交任務重試等待時間和次數,以便降低資源占用。
2)訪問時長要求較高時,就需要調大線程數并保證相對較小的阻塞隊列,調小提交任務的重試等待時間和次數甚至分別調成0和1(即關閉重試提交邏輯)。
作者:京東零售 王文明文章來源:http://www.zghlxwxcb.cn/news/detail-711518.html
來源:京東云開發(fā)者社區(qū) 轉載請注明來源文章來源地址http://www.zghlxwxcb.cn/news/detail-711518.html
到了這里,關于頁面查詢多項數據組合的線程池設計的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!