Java使用線程池批量處理數(shù)據(jù)操作
疑問&思路:
1.如何保證數(shù)據(jù)按順序批量處理
2.如何保證數(shù)據(jù)全部處理完統(tǒng)一返回
3.如何保證是多任務(wù)異步操作
4.如何提高運(yùn)行效率,減少運(yùn)行時(shí)間
1.使用ArrayList 插入數(shù)據(jù)有序且可重復(fù)
2.CountDownLatch / Future / CompletableFuture
3.多線程
4.線程池創(chuàng)建多線程
具體流程:
- 獲取需要進(jìn)行批量更新的大集合oldList,對大集合進(jìn)行拆分操作,分成N個(gè)小集合nweList-1 ~ nweList-N 。
- 開啟線程池,針對集合的大小進(jìn)行調(diào)參,對小集合進(jìn)行批量更新操作。
- 對流程進(jìn)行控制,控制線程執(zhí)行順序。
創(chuàng)建List分割工具類:
public class ListSplitUtils {
//這里使用泛型T 接收 做到通用工具類
//resList總數(shù)據(jù)List subListLength:需要切割的長度
public static <T> List<List<T>> split(List<T> resList, int subListLength) {
if (CollectionUtils.isEmpty(resList) || subListLength <= 0) {
return Lists.newArrayList();
}
List<List<T>> ret = Lists.newArrayList();
int size = resList.size();
if (size <= subListLength) { //指定數(shù)據(jù)過小直接處理
ret.add(resList);
} else {
int n = size / subListLength;
int last = size % subListLength;
// 分成n個(gè)集合,每個(gè)大小都是 subListLength 個(gè)元素
for (int i = 0; i < n; i++) {
List<T> itemList = Lists.newArrayList();
for (int j = 0; j < subListLength; j++) {
itemList.add(resList.get(i * subListLength + j));
}
ret.add(itemList);
}
// last的進(jìn)行處理
if (last > 0) {
List<T> itemList = Lists.newArrayList();
for (int i = 0; i < last; i++) {
itemList.add(resList.get(n* subListLength + i));
}
ret.add(itemList);
}
}
return ret;
}
創(chuàng)建線程池:文章來源:http://www.zghlxwxcb.cn/news/detail-486048.html
// 初始化線程池
/**
* corePoolSize: 一直保持的線程的數(shù)量,即使線程空閑也不會釋放。除非設(shè)置了 allowCoreThreadTimeout 為 true;
* maxPoolSize:允許最大的線程數(shù),隊(duì)列滿時(shí)開啟新線程直到等于該值;
* keepAliveTime:表示空閑線程的存活時(shí)間。當(dāng)線程空閑時(shí)間達(dá)到keepAliveTime,該線程會退出,直到線程數(shù)量等于corePoolSize。只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí)keepAliveTime才會起作用,直到線程中的線程數(shù)不大于corepoolSIze;
* TimeUnitunit:表示keepAliveTime的單位;
* workQueue:緩存任務(wù)的隊(duì)列;
* handler:表示當(dāng) workQueue 已滿,且池中的線程數(shù)達(dá)到 maxPoolSize 時(shí),線程池拒絕添加新任務(wù)時(shí)采取的策略。
*/
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 50, 4, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy());
//大集合拆分成N個(gè)小集合,保證多線程異步執(zhí)行, 過大容易回到單線程
List<List<CheckRecordDetailsDanger>> splitNList = ListSplitUtils.split(CheckRecordDetailsDangerPage, 100); //先設(shè)置100 100以內(nèi)不考慮性能
// 記錄單個(gè)任務(wù)的執(zhí)行次數(shù)
CountDownLatch countDownLatch = new CountDownLatch(splitNList.size());
for (List<CheckRecordDetailsDanger> singleList : splitNList) {
// 線程池執(zhí)行
threadPool.execute(new Thread(() -> {
for (CheckRecordDetailsDanger checkRecordDetailsDanger : singleList) {
//統(tǒng)一賦值方法
//unifySetData(checkRecordDetailsDanger); 這是我的方法,需要替換成自己的處理邏輯
countDownLatch.countDown();
}
}));
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
后話
學(xué)習(xí)過程中可以了解一下 CountDownLatch 和 Future 以及 ThreadPoolExecutor 。文章來源地址http://www.zghlxwxcb.cn/news/detail-486048.html
到了這里,關(guān)于Java使用線程池批量處理數(shù)據(jù)操作的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!