場景:
前段時間在做一個數(shù)據(jù)同步工具,其中一個服務的任務是調(diào)用A服務的接口,將數(shù)據(jù)庫中指定數(shù)據(jù)請求過來,交給kafka去判斷哪些數(shù)據(jù)是需要新增,哪些數(shù)據(jù)是需要修改的。
剛開始的設計思路是,,我創(chuàng)建多個服務同時去請求A服務的接口,每個服務都請求到全量數(shù)據(jù),由于這些服務都注冊在xxl-job上,而且采用的是分片廣播的路由策略,那么每個服務就可以只處理請求到的所有數(shù)據(jù)中id%服務總數(shù)==分片索引的部分數(shù)據(jù),然后交給kafka,由kafka決定這條數(shù)據(jù)應該放到哪個分區(qū)上。
解決方案
最近學了線程池后,回過頭來思考,認為之前的方案還有很大的優(yōu)化空間。
- 1.當數(shù)據(jù)量很大時,一次性查詢所有數(shù)據(jù)會導致數(shù)據(jù)庫的負載過大,而使用分頁查詢,每次只查詢部分數(shù)據(jù),可以減輕數(shù)據(jù)庫的負擔,從而提高數(shù)據(jù)庫的性能和響應速度,所以請求數(shù)據(jù)方每次分頁查詢少量數(shù)據(jù),這樣可以整體降低請求數(shù)據(jù)的時間。
- 第一次優(yōu)化.之前是每個服務都要把全量數(shù)據(jù)請求過來,假設全量數(shù)據(jù)1000w條,一個服務請求數(shù)據(jù)需要100s,我開5個服務,那請求數(shù)據(jù)的總時長就是500s?,F(xiàn)在把1000w條數(shù)據(jù)均分給5個服務,那1個服務就只需要請求200w條數(shù)據(jù),耗時20s,那所有服務的請求總時長就是100s??傮w耗時縮小了5倍。上面說的分頁查詢就可以實現(xiàn):頁面大小假設10w(也就是將1000w/10w,邏輯上分成了100頁),每個服務自己的分片索引作為頁號,每次請求完,都給索引加上分片總數(shù)(例如:當前注冊了五個服務,那分片總數(shù)=5,對于分片索引為1的服務來說,請求的頁號為1,6,11,16,21。。。,對于分片索引為2的服務來說,請求的頁號為2,7,12,17。。。,對于分片索引為3的服務來說,請求的頁號為3,8,13,18,。。。。,對于分片索引為4的服務來說,請求的頁號為4,9,14,19。。。。,對于分片索引為5的服務來說,請求的頁號為5,10,15,20.。。)這樣1000w條數(shù)據(jù)就均分到每個服務上了。對于每個服務都是單線程去請求數(shù)據(jù),就可以將請求操作以及(頁號+總服務數(shù))的操作寫在一個while循環(huán)里,一直請求數(shù)據(jù),直到請求的數(shù)據(jù)為空時(也就是頁號超過100了),退出while。
//單線程情況下
while(true){
String body = HttpUtil.get(remoteURL+"?pageSize=100000&pageNum="+shardIndex);
// logger.info("body:{}",body);
//2.獲取返回結果的message
JSONObject jsonObject = new JSONObject();
// if (StrUtil.isNotBlank(body)) {
jsonObject = JSONUtil.parseObj(body);
// logger.info("name:{}",Thread.currentThread().getName());
// }
// logger.info("jsonObject:{}",jsonObject);
//3.從body中獲取data
List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
if(CollectionUtil.isEmpty(tests)){
break;
}
shardIndex+=shardTotal;
}
-
第二次優(yōu)化: 了解了線程池后,還可以再優(yōu)化。之前是一個服務單線程循環(huán)請求需要20s(假設),每次請求10w條,需要請求200w/10w,也就是20次,那一次請求就需要1s。如果使用線程池的話,那么耗時還會更小,因為當你將任務都交給線程池去執(zhí)行時,多個線程會同時(并行)去請求各自頁的數(shù)據(jù),假如你只設置了4個線程,那這4個線程會同時發(fā)起請求獲取數(shù)據(jù),1s會完成4次請求,那分給服務的200w,5s就請求完了。那5個服務從總耗時500s,降到了總耗時5s*5=25s。
這次優(yōu)化,第一版代碼(只展示了請求數(shù)據(jù)的代碼,其他業(yè)務代碼沒有展示)
一直向線程池里扔請求數(shù)據(jù)的任務,當某個任務請求到的數(shù)據(jù)是空的時候,意味著要請求的數(shù)據(jù)已經(jīng)沒了,那就結束循環(huán),不再扔請求數(shù)據(jù)的任務。
//線程共享變量
static volatile boolean flag = true;
@XxlJob(value = "fenpian")
public void fenpian() {
int shardIndex = XxlJobHelper.getShardIndex();
// int shardTotal = XxlJobHelper.getShardTotal();
//分片總數(shù)
int shardTotal = 4;
AtomicInteger pageNum = new AtomicInteger(shardIndex);
//多線程情況下
// List<CompletableFuture>completableFutureList=new ArrayList<>();
while (flag){
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
String body = HttpUtil.get(remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal));
JSONObject jsonObject = new JSONObject();
jsonObject = JSONUtil.parseObj(body);
List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
logger.info("tests的size:{}",tests.size());
if(CollectionUtil.isEmpty(tests)){
flag=false;
}
},executorService);
completableFutureList.add(future);
}
CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);
CompletableFuture.allOf(completableFutures).join();
logger.info("任務結束");
executorService.shutdown();
上面代碼會有一個問題,就是while循環(huán)往線程池里扔任務,所有線程在執(zhí)行時,會在請求數(shù)據(jù)那里”停留“一段時間,“停留期間”還會一直循環(huán)向線程池扔任務,當線程執(zhí)行完某次請求得到空數(shù)據(jù)結束循環(huán)時,等待隊列中還排著大堆任務等著去請求數(shù)據(jù)。
為了解決這個問題,我改用了for循環(huán)提交任務,提前根據(jù)請求數(shù)據(jù)總量、每次讀取的條數(shù),以及服務總數(shù)得到每個服務需要執(zhí)行的任務數(shù)。
第二版代碼文章來源:http://www.zghlxwxcb.cn/news/detail-646513.html
@XxlJob(value = "fenpian")
public void fenpian() {
int shardIndex = XxlJobHelper.getShardIndex()+1;
int shardTotal = XxlJobHelper.getShardTotal();
//分片總數(shù)
// int shardTotal = 4;
AtomicInteger pageNum = new AtomicInteger(shardIndex);
//多線程情況下
List<CompletableFuture>completableFutureList=new ArrayList<>();
//總條數(shù)
double total = 10000000;
//讀取的條數(shù)
double pageSize=1000;
double tasks = Math.ceil( total / (double) shardTotal / pageSize);
logger.info("任務數(shù){}",tasks);
for(double i=0;i<tasks;i++){
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
String url = remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal);
logger.info("url:{},threadName:{}",url,Thread.currentThread().getName());
String body = HttpUtil.get(url);
JSONObject jsonObject = new JSONObject();
jsonObject = JSONUtil.parseObj(body);
List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
logger.info("tests的size:{}",tests.size());
},executorService);
completableFutureList.add(future);
}
CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);
CompletableFuture.allOf(completableFutures).join();
logger.info("任務結束");
如有問題,請求指正(^^ゞ文章來源地址http://www.zghlxwxcb.cn/news/detail-646513.html
到了這里,關于【并發(fā)編程】線程池多線程異步去分頁調(diào)用其他服務接口獲取海量數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!