場(chǎng)景:
前段時(shí)間在做一個(gè)數(shù)據(jù)同步工具,其中一個(gè)服務(wù)的任務(wù)是調(diào)用A服務(wù)的接口,將數(shù)據(jù)庫(kù)中指定數(shù)據(jù)請(qǐng)求過(guò)來(lái),交給kafka去判斷哪些數(shù)據(jù)是需要新增,哪些數(shù)據(jù)是需要修改的。
剛開(kāi)始的設(shè)計(jì)思路是,,我創(chuàng)建多個(gè)服務(wù)同時(shí)去請(qǐng)求A服務(wù)的接口,每個(gè)服務(wù)都請(qǐng)求到全量數(shù)據(jù),由于這些服務(wù)都注冊(cè)在xxl-job上,而且采用的是分片廣播的路由策略,那么每個(gè)服務(wù)就可以只處理請(qǐng)求到的所有數(shù)據(jù)中id%服務(wù)總數(shù)==分片索引的部分?jǐn)?shù)據(jù),然后交給kafka,由kafka決定這條數(shù)據(jù)應(yīng)該放到哪個(gè)分區(qū)上。
解決方案
最近學(xué)了線程池后,回過(guò)頭來(lái)思考,認(rèn)為之前的方案還有很大的優(yōu)化空間。
- 1.當(dāng)數(shù)據(jù)量很大時(shí),一次性查詢所有數(shù)據(jù)會(huì)導(dǎo)致數(shù)據(jù)庫(kù)的負(fù)載過(guò)大,而使用分頁(yè)查詢,每次只查詢部分?jǐn)?shù)據(jù),可以減輕數(shù)據(jù)庫(kù)的負(fù)擔(dān),從而提高數(shù)據(jù)庫(kù)的性能和響應(yīng)速度,所以請(qǐng)求數(shù)據(jù)方每次分頁(yè)查詢少量數(shù)據(jù),這樣可以整體降低請(qǐng)求數(shù)據(jù)的時(shí)間。
- 第一次優(yōu)化.之前是每個(gè)服務(wù)都要把全量數(shù)據(jù)請(qǐng)求過(guò)來(lái),假設(shè)全量數(shù)據(jù)1000w條,一個(gè)服務(wù)請(qǐng)求數(shù)據(jù)需要100s,我開(kāi)5個(gè)服務(wù),那請(qǐng)求數(shù)據(jù)的總時(shí)長(zhǎng)就是500s?,F(xiàn)在把1000w條數(shù)據(jù)均分給5個(gè)服務(wù),那1個(gè)服務(wù)就只需要請(qǐng)求200w條數(shù)據(jù),耗時(shí)20s,那所有服務(wù)的請(qǐng)求總時(shí)長(zhǎng)就是100s??傮w耗時(shí)縮小了5倍。上面說(shuō)的分頁(yè)查詢就可以實(shí)現(xiàn):頁(yè)面大小假設(shè)10w(也就是將1000w/10w,邏輯上分成了100頁(yè)),每個(gè)服務(wù)自己的分片索引作為頁(yè)號(hào),每次請(qǐng)求完,都給索引加上分片總數(shù)(例如:當(dāng)前注冊(cè)了五個(gè)服務(wù),那分片總數(shù)=5,對(duì)于分片索引為1的服務(wù)來(lái)說(shuō),請(qǐng)求的頁(yè)號(hào)為1,6,11,16,21。。。,對(duì)于分片索引為2的服務(wù)來(lái)說(shuō),請(qǐng)求的頁(yè)號(hào)為2,7,12,17。。。,對(duì)于分片索引為3的服務(wù)來(lái)說(shuō),請(qǐng)求的頁(yè)號(hào)為3,8,13,18,。。。。,對(duì)于分片索引為4的服務(wù)來(lái)說(shuō),請(qǐng)求的頁(yè)號(hào)為4,9,14,19。。。。,對(duì)于分片索引為5的服務(wù)來(lái)說(shuō),請(qǐng)求的頁(yè)號(hào)為5,10,15,20.。。)這樣1000w條數(shù)據(jù)就均分到每個(gè)服務(wù)上了。對(duì)于每個(gè)服務(wù)都是單線程去請(qǐng)求數(shù)據(jù),就可以將請(qǐng)求操作以及(頁(yè)號(hào)+總服務(wù)數(shù))的操作寫(xiě)在一個(gè)while循環(huán)里,一直請(qǐng)求數(shù)據(jù),直到請(qǐng)求的數(shù)據(jù)為空時(shí)(也就是頁(yè)號(hào)超過(guò)100了),退出while。
//單線程情況下
while(true){
String body = HttpUtil.get(remoteURL+"?pageSize=100000&pageNum="+shardIndex);
// logger.info("body:{}",body);
//2.獲取返回結(jié)果的message
JSONObject jsonObject = new JSONObject();
// if (StrUtil.isNotBlank(body)) {
jsonObject = JSONUtil.parseObj(body);
// logger.info("name:{}",Thread.currentThread().getName());
// }
// logger.info("jsonObject:{}",jsonObject);
//3.從body中獲取data
List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
if(CollectionUtil.isEmpty(tests)){
break;
}
shardIndex+=shardTotal;
}
-
第二次優(yōu)化: 了解了線程池后,還可以再優(yōu)化。之前是一個(gè)服務(wù)單線程循環(huán)請(qǐng)求需要20s(假設(shè)),每次請(qǐng)求10w條,需要請(qǐng)求200w/10w,也就是20次,那一次請(qǐng)求就需要1s。如果使用線程池的話,那么耗時(shí)還會(huì)更小,因?yàn)楫?dāng)你將任務(wù)都交給線程池去執(zhí)行時(shí),多個(gè)線程會(huì)同時(shí)(并行)去請(qǐng)求各自頁(yè)的數(shù)據(jù),假如你只設(shè)置了4個(gè)線程,那這4個(gè)線程會(huì)同時(shí)發(fā)起請(qǐng)求獲取數(shù)據(jù),1s會(huì)完成4次請(qǐng)求,那分給服務(wù)的200w,5s就請(qǐng)求完了。那5個(gè)服務(wù)從總耗時(shí)500s,降到了總耗時(shí)5s*5=25s。
這次優(yōu)化,第一版代碼(只展示了請(qǐng)求數(shù)據(jù)的代碼,其他業(yè)務(wù)代碼沒(méi)有展示)
一直向線程池里扔請(qǐng)求數(shù)據(jù)的任務(wù),當(dāng)某個(gè)任務(wù)請(qǐng)求到的數(shù)據(jù)是空的時(shí)候,意味著要請(qǐng)求的數(shù)據(jù)已經(jīng)沒(méi)了,那就結(jié)束循環(huán),不再扔請(qǐng)求數(shù)據(jù)的任務(wù)。
//線程共享變量
static volatile boolean flag = true;
@XxlJob(value = "fenpian")
public void fenpian() {
int shardIndex = XxlJobHelper.getShardIndex();
// int shardTotal = XxlJobHelper.getShardTotal();
//分片總數(shù)
int shardTotal = 4;
AtomicInteger pageNum = new AtomicInteger(shardIndex);
//多線程情況下
// List<CompletableFuture>completableFutureList=new ArrayList<>();
while (flag){
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
String body = HttpUtil.get(remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal));
JSONObject jsonObject = new JSONObject();
jsonObject = JSONUtil.parseObj(body);
List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
logger.info("tests的size:{}",tests.size());
if(CollectionUtil.isEmpty(tests)){
flag=false;
}
},executorService);
completableFutureList.add(future);
}
CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);
CompletableFuture.allOf(completableFutures).join();
logger.info("任務(wù)結(jié)束");
executorService.shutdown();
上面代碼會(huì)有一個(gè)問(wèn)題,就是while循環(huán)往線程池里扔任務(wù),所有線程在執(zhí)行時(shí),會(huì)在請(qǐng)求數(shù)據(jù)那里”停留“一段時(shí)間,“停留期間”還會(huì)一直循環(huán)向線程池扔任務(wù),當(dāng)線程執(zhí)行完某次請(qǐng)求得到空數(shù)據(jù)結(jié)束循環(huán)時(shí),等待隊(duì)列中還排著大堆任務(wù)等著去請(qǐng)求數(shù)據(jù)。
為了解決這個(gè)問(wèn)題,我改用了for循環(huán)提交任務(wù),提前根據(jù)請(qǐng)求數(shù)據(jù)總量、每次讀取的條數(shù),以及服務(wù)總數(shù)得到每個(gè)服務(wù)需要執(zhí)行的任務(wù)數(shù)。
第二版代碼文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-651554.html
@XxlJob(value = "fenpian")
public void fenpian() {
int shardIndex = XxlJobHelper.getShardIndex()+1;
int shardTotal = XxlJobHelper.getShardTotal();
//分片總數(shù)
// int shardTotal = 4;
AtomicInteger pageNum = new AtomicInteger(shardIndex);
//多線程情況下
List<CompletableFuture>completableFutureList=new ArrayList<>();
//總條數(shù)
double total = 10000000;
//讀取的條數(shù)
double pageSize=1000;
double tasks = Math.ceil( total / (double) shardTotal / pageSize);
logger.info("任務(wù)數(shù){}",tasks);
for(double i=0;i<tasks;i++){
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
String url = remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal);
logger.info("url:{},threadName:{}",url,Thread.currentThread().getName());
String body = HttpUtil.get(url);
JSONObject jsonObject = new JSONObject();
jsonObject = JSONUtil.parseObj(body);
List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
logger.info("tests的size:{}",tests.size());
},executorService);
completableFutureList.add(future);
}
CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);
CompletableFuture.allOf(completableFutures).join();
logger.info("任務(wù)結(jié)束");
如有問(wèn)題,請(qǐng)求指正(^^ゞ文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-651554.html
到了這里,關(guān)于【并發(fā)編程】自研數(shù)據(jù)同步工具的優(yōu)化:創(chuàng)建線程池多線程異步去分頁(yè)調(diào)用其他服務(wù)接口獲取海量數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!