国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【并發(fā)編程】線程池多線程異步去分頁調(diào)用其他服務接口獲取海量數(shù)據(jù)

這篇具有很好參考價值的文章主要介紹了【并發(fā)編程】線程池多線程異步去分頁調(diào)用其他服務接口獲取海量數(shù)據(jù)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

場景:

前段時間在做一個數(shù)據(jù)同步工具,其中一個服務的任務是調(diào)用A服務的接口,將數(shù)據(jù)庫中指定數(shù)據(jù)請求過來,交給kafka去判斷哪些數(shù)據(jù)是需要新增,哪些數(shù)據(jù)是需要修改的。

剛開始的設計思路是,,我創(chuàng)建多個服務同時去請求A服務的接口,每個服務都請求到全量數(shù)據(jù),由于這些服務都注冊在xxl-job上,而且采用的是分片廣播的路由策略,那么每個服務就可以只處理請求到的所有數(shù)據(jù)中id%服務總數(shù)==分片索引的部分數(shù)據(jù),然后交給kafka,由kafka決定這條數(shù)據(jù)應該放到哪個分區(qū)上。

解決方案

最近學了線程池后,回過頭來思考,認為之前的方案還有很大的優(yōu)化空間。

  • 1.當數(shù)據(jù)量很大時,一次性查詢所有數(shù)據(jù)會導致數(shù)據(jù)庫的負載過大,而使用分頁查詢,每次只查詢部分數(shù)據(jù),可以減輕數(shù)據(jù)庫的負擔,從而提高數(shù)據(jù)庫的性能和響應速度,所以請求數(shù)據(jù)方每次分頁查詢少量數(shù)據(jù),這樣可以整體降低請求數(shù)據(jù)的時間。
  • 第一次優(yōu)化.之前是每個服務都要把全量數(shù)據(jù)請求過來,假設全量數(shù)據(jù)1000w條,一個服務請求數(shù)據(jù)需要100s,我開5個服務,那請求數(shù)據(jù)的總時長就是500s?,F(xiàn)在把1000w條數(shù)據(jù)均分給5個服務,那1個服務就只需要請求200w條數(shù)據(jù),耗時20s,那所有服務的請求總時長就是100s??傮w耗時縮小了5倍。上面說的分頁查詢就可以實現(xiàn):頁面大小假設10w(也就是將1000w/10w,邏輯上分成了100頁),每個服務自己的分片索引作為頁號,每次請求完,都給索引加上分片總數(shù)(例如:當前注冊了五個服務,那分片總數(shù)=5,對于分片索引為1的服務來說,請求的頁號為1,6,11,16,21。。。,對于分片索引為2的服務來說,請求的頁號為2,7,12,17。。。,對于分片索引為3的服務來說,請求的頁號為3,8,13,18,。。。。,對于分片索引為4的服務來說,請求的頁號為4,9,14,19。。。。,對于分片索引為5的服務來說,請求的頁號為5,10,15,20.。。)這樣1000w條數(shù)據(jù)就均分到每個服務上了。對于每個服務都是單線程去請求數(shù)據(jù),就可以將請求操作以及(頁號+總服務數(shù))的操作寫在一個while循環(huán)里,一直請求數(shù)據(jù),直到請求的數(shù)據(jù)為空時(也就是頁號超過100了),退出while。
        //單線程情況下
        while(true){

            String body = HttpUtil.get(remoteURL+"?pageSize=100000&pageNum="+shardIndex);
//        logger.info("body:{}",body);
            //2.獲取返回結果的message
            JSONObject jsonObject = new JSONObject();
//        if (StrUtil.isNotBlank(body)) {
            jsonObject = JSONUtil.parseObj(body);
//            logger.info("name:{}",Thread.currentThread().getName());
//        }
//        logger.info("jsonObject:{}",jsonObject);
            //3.從body中獲取data
            List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
            if(CollectionUtil.isEmpty(tests)){
                break;
            }
            shardIndex+=shardTotal;
        }
  • 第二次優(yōu)化: 了解了線程池后,還可以再優(yōu)化。之前是一個服務單線程循環(huán)請求需要20s(假設),每次請求10w條,需要請求200w/10w,也就是20次,那一次請求就需要1s。如果使用線程池的話,那么耗時還會更小,因為當你將任務都交給線程池去執(zhí)行時,多個線程會同時(并行)去請求各自頁的數(shù)據(jù),假如你只設置了4個線程,那這4個線程會同時發(fā)起請求獲取數(shù)據(jù),1s會完成4次請求,那分給服務的200w,5s就請求完了。那5個服務從總耗時500s,降到了總耗時5s*5=25s。
    這次優(yōu)化,第一版代碼(只展示了請求數(shù)據(jù)的代碼,其他業(yè)務代碼沒有展示)
    一直向線程池里扔請求數(shù)據(jù)的任務,當某個任務請求到的數(shù)據(jù)是空的時候,意味著要請求的數(shù)據(jù)已經(jīng)沒了,那就結束循環(huán),不再扔請求數(shù)據(jù)的任務。
    //線程共享變量
    static volatile boolean flag = true;
    @XxlJob(value = "fenpian")
    public void fenpian() {
        int shardIndex = XxlJobHelper.getShardIndex();
//        int shardTotal = XxlJobHelper.getShardTotal();
        //分片總數(shù)
        int shardTotal = 4;
        AtomicInteger pageNum = new AtomicInteger(shardIndex);
        //多線程情況下
//        List<CompletableFuture>completableFutureList=new ArrayList<>();
        while (flag){
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                String body = HttpUtil.get(remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal));
	             JSONObject jsonObject = new JSONObject();
	             jsonObject = JSONUtil.parseObj(body);
	             List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
	             logger.info("tests的size:{}",tests.size());
	             if(CollectionUtil.isEmpty(tests)){
	                 flag=false;
	             }
            },executorService);


        completableFutureList.add(future);
        }
        CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);
        CompletableFuture.allOf(completableFutures).join();
       logger.info("任務結束");
        executorService.shutdown();

上面代碼會有一個問題,就是while循環(huán)往線程池里扔任務,所有線程在執(zhí)行時,會在請求數(shù)據(jù)那里”停留“一段時間,“停留期間”還會一直循環(huán)向線程池扔任務,當線程執(zhí)行完某次請求得到空數(shù)據(jù)結束循環(huán)時,等待隊列中還排著大堆任務等著去請求數(shù)據(jù)。

為了解決這個問題,我改用了for循環(huán)提交任務,提前根據(jù)請求數(shù)據(jù)總量、每次讀取的條數(shù),以及服務總數(shù)得到每個服務需要執(zhí)行的任務數(shù)。
第二版代碼

@XxlJob(value = "fenpian")
    public void fenpian() {
        int shardIndex = XxlJobHelper.getShardIndex()+1;
        int shardTotal = XxlJobHelper.getShardTotal();
        //分片總數(shù)
//        int shardTotal = 4;
        AtomicInteger pageNum = new AtomicInteger(shardIndex);
        //多線程情況下
        List<CompletableFuture>completableFutureList=new ArrayList<>();
        //總條數(shù)
        double total = 10000000;
        //讀取的條數(shù)
        double pageSize=1000;
        double tasks = Math.ceil( total / (double) shardTotal / pageSize);
        logger.info("任務數(shù){}",tasks);
        for(double i=0;i<tasks;i++){
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                String url = remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal);
                logger.info("url:{},threadName:{}",url,Thread.currentThread().getName());
                String body = HttpUtil.get(url);
                JSONObject jsonObject = new JSONObject();
                jsonObject = JSONUtil.parseObj(body);
                List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
                logger.info("tests的size:{}",tests.size());
            },executorService);
        completableFutureList.add(future);
        }
        CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);
        CompletableFuture.allOf(completableFutures).join();
       logger.info("任務結束");

如有問題,請求指正(^^ゞ文章來源地址http://www.zghlxwxcb.cn/news/detail-646513.html

到了這里,關于【并發(fā)編程】線程池多線程異步去分頁調(diào)用其他服務接口獲取海量數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • (線程池)多線程使用場景--es數(shù)據(jù)批量導入、數(shù)據(jù)匯總、異步調(diào)用;如何控制某個方法允許并發(fā)訪問線程的數(shù)量;對ThreadLocal的理解及實現(xiàn)原理、源碼解析、ThreadLocal的內(nèi)存泄露問題

    (線程池)多線程使用場景--es數(shù)據(jù)批量導入、數(shù)據(jù)匯總、異步調(diào)用;如何控制某個方法允許并發(fā)訪問線程的數(shù)量;對ThreadLocal的理解及實現(xiàn)原理、源碼解析、ThreadLocal的內(nèi)存泄露問題

    CountDownLatch(閉鎖/倒計時鎖) 用來進行線程同步協(xié)作,等待所有線程完成倒計時(一個或者多個線程,等待其他多個線程完成某件事情之后才能執(zhí)行) 其中構造參數(shù)用來初始化等待計數(shù)值 await() 用來等待計數(shù)歸零 countDown() 用來讓計數(shù) 減一 多線程使用場景一:( es數(shù)據(jù)批量導

    2024年04月25日
    瀏覽(46)
  • 并發(fā)編程 --- 異步方法的異常處理

    現(xiàn)在模擬一個異步方法拋出了異常: 思考一下, DontHandle() 方法是否能夠捕獲到異常? 答案是:不會捕獲到異常! 因為 DontHandle() 方法在 ThrowAfter() 方法拋出異常之前,就已經(jīng)執(zhí)行完畢。 那么上述代碼怎么才能捕獲到異常呢? 若想要捕獲異常則必須通過 await 等待 Th

    2024年02月15日
    瀏覽(18)
  • Rust基礎拾遺--并發(fā)和異步編程

    ? ?通過 Rust程序設計-第二版 筆記的形式對Rust相關 重點知識 進行匯總,讀者通讀此系列文章就可以輕松的把該語言基礎撿起來。 為什么一些看似正確的多線程慣用法卻根本不起作用? 與“內(nèi)存模型”有關 你最終會找到一種自己用起來順手且不會經(jīng)常出錯的并發(fā)慣用法。

    2024年02月19日
    瀏覽(27)
  • JUC并發(fā)編程學習筆記(十四)異步回調(diào)

    JUC并發(fā)編程學習筆記(十四)異步回調(diào)

    Future設計的初衷:對將來的某個事件的結果進行建模 在Future類的子類中可以找到CompletableFuture,在介紹中可以看到這是為非異步的請求使用一些異步的方法來處理 點進具體實現(xiàn)類中,查看方法,可以看到CompletableFuture中的異步內(nèi)部類,里面是實現(xiàn)的異步方法 以及一些異步方法

    2024年02月05日
    瀏覽(19)
  • Python異步編程之web框架 異步vs同步 Redis并發(fā)對比

    Python異步編程之web框架 異步vs同步 Redis并發(fā)對比

    主題: 比較異步框架和同步框架在RedisIO操作的性能差異 python版本 :python 3.8 數(shù)據(jù)庫 :redis 5.0.7 壓測工具 :locust web框架 :同步:flask 異步:starlette 請求并發(fā)量 : 模擬10個用戶 服務器配置 : Intel(R) i7-12700F 客戶端配置 :Intel(R) i7-8700 3.20GHz flask是python中輕量級web框架,特點是靈

    2024年02月10日
    瀏覽(29)
  • 并發(fā)編程 | CompletionService - 如何優(yōu)雅地處理批量異步任務

    上一篇文章中,我們詳細地介紹了 CompletableFuture,它是一種強大的并發(fā)工具,能幫助我們以聲明式的方式處理異步任務。雖然 CompletableFuture 很強大,但它并不總是最適合所有場景的解決方案。 在這篇文章中,我們將介紹 Java 的 CompletionService,這是一種能處理 批量異步任務

    2024年02月15日
    瀏覽(40)
  • 并發(fā)編程 | 從Future到CompletableFuture - 簡化 Java 中的異步編程

    在并發(fā)編程中,我們經(jīng)常需要處理多線程的任務,這些任務往往具有依賴性,異步性,且需要在所有任務完成后獲取結果。Java 8 引入了 CompletableFuture 類,它帶來了一種新的編程模式,讓我們能夠以函數(shù)式編程的方式處理并發(fā)任務,顯著提升了代碼的可讀性和簡潔性。 在這篇

    2024年02月13日
    瀏覽(27)
  • 并發(fā)編程(高并發(fā)、多線程)

    并發(fā)編程(高并發(fā)、多線程)

    1.1.1 并發(fā)編程三要素 首先我們要了解并發(fā)編程的三要素 原子性 可見性 有序性 1.原子性 原子性是指一個操作是不可分割的單元,要么全部執(zhí)行成功,要么全部失敗。 在并發(fā)環(huán)境中,多個線程可能同時訪問和修改共享的數(shù)據(jù),為了確保數(shù)據(jù)的一致性,需要確保一組相關的操作

    2024年02月04日
    瀏覽(24)
  • 多線程并發(fā)編程-線程篇

    多線程并發(fā)編程-線程篇

    系統(tǒng)中的一個程序就是一個進程,每個進程中的最基本的執(zhí)行單位,執(zhí)行路徑就是線程,線程是輕量化的進程。 綠色線程,由用戶自己進行管理的而不是系統(tǒng)進行管理的,我理解就是一個進程里面可以有多線程,一個線程里面有多進程(go里面叫協(xié)程) 線程是按照CPU分的時間

    2023年04月21日
    瀏覽(25)
  • Python異步編程高并發(fā)執(zhí)行爬蟲采集,用回調(diào)函數(shù)解析響應

    Python異步編程高并發(fā)執(zhí)行爬蟲采集,用回調(diào)函數(shù)解析響應

    異步技術是Python編程中對提升性能非常重要的一項技術。在實際應用,經(jīng)常面臨對外發(fā)送網(wǎng)絡請求,調(diào)用外部接口,或者不斷更新數(shù)據(jù)庫或文件等操作。 這這些操作,通常90%以上時間是在等待,如通過REST, gRPC向服務器發(fā)送請求,通??赡艿却龓资撩胫翈酌耄踔粮L。如

    2024年02月08日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包