国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【并發(fā)編程】自研數(shù)據(jù)同步工具的優(yōu)化:創(chuàng)建線程池多線程異步去分頁(yè)調(diào)用其他服務(wù)接口獲取海量數(shù)據(jù)

這篇具有很好參考價(jià)值的文章主要介紹了【并發(fā)編程】自研數(shù)據(jù)同步工具的優(yōu)化:創(chuàng)建線程池多線程異步去分頁(yè)調(diào)用其他服務(wù)接口獲取海量數(shù)據(jù)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

場(chǎng)景:

前段時(shí)間在做一個(gè)數(shù)據(jù)同步工具,其中一個(gè)服務(wù)的任務(wù)是調(diào)用A服務(wù)的接口,將數(shù)據(jù)庫(kù)中指定數(shù)據(jù)請(qǐng)求過(guò)來(lái),交給kafka去判斷哪些數(shù)據(jù)是需要新增,哪些數(shù)據(jù)是需要修改的。

剛開(kāi)始的設(shè)計(jì)思路是,,我創(chuàng)建多個(gè)服務(wù)同時(shí)去請(qǐng)求A服務(wù)的接口,每個(gè)服務(wù)都請(qǐng)求到全量數(shù)據(jù),由于這些服務(wù)都注冊(cè)在xxl-job上,而且采用的是分片廣播的路由策略,那么每個(gè)服務(wù)就可以只處理請(qǐng)求到的所有數(shù)據(jù)中id%服務(wù)總數(shù)==分片索引的部分?jǐn)?shù)據(jù),然后交給kafka,由kafka決定這條數(shù)據(jù)應(yīng)該放到哪個(gè)分區(qū)上。

解決方案

最近學(xué)了線程池后,回過(guò)頭來(lái)思考,認(rèn)為之前的方案還有很大的優(yōu)化空間。

  • 1.當(dāng)數(shù)據(jù)量很大時(shí),一次性查詢所有數(shù)據(jù)會(huì)導(dǎo)致數(shù)據(jù)庫(kù)的負(fù)載過(guò)大,而使用分頁(yè)查詢,每次只查詢部分?jǐn)?shù)據(jù),可以減輕數(shù)據(jù)庫(kù)的負(fù)擔(dān),從而提高數(shù)據(jù)庫(kù)的性能和響應(yīng)速度,所以請(qǐng)求數(shù)據(jù)方每次分頁(yè)查詢少量數(shù)據(jù),這樣可以整體降低請(qǐng)求數(shù)據(jù)的時(shí)間。
  • 第一次優(yōu)化.之前是每個(gè)服務(wù)都要把全量數(shù)據(jù)請(qǐng)求過(guò)來(lái),假設(shè)全量數(shù)據(jù)1000w條,一個(gè)服務(wù)請(qǐng)求數(shù)據(jù)需要100s,我開(kāi)5個(gè)服務(wù),那請(qǐng)求數(shù)據(jù)的總時(shí)長(zhǎng)就是500s?,F(xiàn)在把1000w條數(shù)據(jù)均分給5個(gè)服務(wù),那1個(gè)服務(wù)就只需要請(qǐng)求200w條數(shù)據(jù),耗時(shí)20s,那所有服務(wù)的請(qǐng)求總時(shí)長(zhǎng)就是100s??傮w耗時(shí)縮小了5倍。上面說(shuō)的分頁(yè)查詢就可以實(shí)現(xiàn):頁(yè)面大小假設(shè)10w(也就是將1000w/10w,邏輯上分成了100頁(yè)),每個(gè)服務(wù)自己的分片索引作為頁(yè)號(hào),每次請(qǐng)求完,都給索引加上分片總數(shù)(例如:當(dāng)前注冊(cè)了五個(gè)服務(wù),那分片總數(shù)=5,對(duì)于分片索引為1的服務(wù)來(lái)說(shuō),請(qǐng)求的頁(yè)號(hào)為1,6,11,16,21。。。,對(duì)于分片索引為2的服務(wù)來(lái)說(shuō),請(qǐng)求的頁(yè)號(hào)為2,7,12,17。。。,對(duì)于分片索引為3的服務(wù)來(lái)說(shuō),請(qǐng)求的頁(yè)號(hào)為3,8,13,18,。。。。,對(duì)于分片索引為4的服務(wù)來(lái)說(shuō),請(qǐng)求的頁(yè)號(hào)為4,9,14,19。。。。,對(duì)于分片索引為5的服務(wù)來(lái)說(shuō),請(qǐng)求的頁(yè)號(hào)為5,10,15,20.。。)這樣1000w條數(shù)據(jù)就均分到每個(gè)服務(wù)上了。對(duì)于每個(gè)服務(wù)都是單線程去請(qǐng)求數(shù)據(jù),就可以將請(qǐng)求操作以及(頁(yè)號(hào)+總服務(wù)數(shù))的操作寫(xiě)在一個(gè)while循環(huán)里,一直請(qǐng)求數(shù)據(jù),直到請(qǐng)求的數(shù)據(jù)為空時(shí)(也就是頁(yè)號(hào)超過(guò)100了),退出while。
        //單線程情況下
        while(true){

            String body = HttpUtil.get(remoteURL+"?pageSize=100000&pageNum="+shardIndex);
//        logger.info("body:{}",body);
            //2.獲取返回結(jié)果的message
            JSONObject jsonObject = new JSONObject();
//        if (StrUtil.isNotBlank(body)) {
            jsonObject = JSONUtil.parseObj(body);
//            logger.info("name:{}",Thread.currentThread().getName());
//        }
//        logger.info("jsonObject:{}",jsonObject);
            //3.從body中獲取data
            List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
            if(CollectionUtil.isEmpty(tests)){
                break;
            }
            shardIndex+=shardTotal;
        }
  • 第二次優(yōu)化: 了解了線程池后,還可以再優(yōu)化。之前是一個(gè)服務(wù)單線程循環(huán)請(qǐng)求需要20s(假設(shè)),每次請(qǐng)求10w條,需要請(qǐng)求200w/10w,也就是20次,那一次請(qǐng)求就需要1s。如果使用線程池的話,那么耗時(shí)還會(huì)更小,因?yàn)楫?dāng)你將任務(wù)都交給線程池去執(zhí)行時(shí),多個(gè)線程會(huì)同時(shí)(并行)去請(qǐng)求各自頁(yè)的數(shù)據(jù),假如你只設(shè)置了4個(gè)線程,那這4個(gè)線程會(huì)同時(shí)發(fā)起請(qǐng)求獲取數(shù)據(jù),1s會(huì)完成4次請(qǐng)求,那分給服務(wù)的200w,5s就請(qǐng)求完了。那5個(gè)服務(wù)從總耗時(shí)500s,降到了總耗時(shí)5s*5=25s。
    這次優(yōu)化,第一版代碼(只展示了請(qǐng)求數(shù)據(jù)的代碼,其他業(yè)務(wù)代碼沒(méi)有展示)
    一直向線程池里扔請(qǐng)求數(shù)據(jù)的任務(wù),當(dāng)某個(gè)任務(wù)請(qǐng)求到的數(shù)據(jù)是空的時(shí)候,意味著要請(qǐng)求的數(shù)據(jù)已經(jīng)沒(méi)了,那就結(jié)束循環(huán),不再扔請(qǐng)求數(shù)據(jù)的任務(wù)。
    //線程共享變量
    static volatile boolean flag = true;
    @XxlJob(value = "fenpian")
    public void fenpian() {
        int shardIndex = XxlJobHelper.getShardIndex();
//        int shardTotal = XxlJobHelper.getShardTotal();
        //分片總數(shù)
        int shardTotal = 4;
        AtomicInteger pageNum = new AtomicInteger(shardIndex);
        //多線程情況下
//        List<CompletableFuture>completableFutureList=new ArrayList<>();
        while (flag){
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                String body = HttpUtil.get(remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal));
	             JSONObject jsonObject = new JSONObject();
	             jsonObject = JSONUtil.parseObj(body);
	             List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
	             logger.info("tests的size:{}",tests.size());
	             if(CollectionUtil.isEmpty(tests)){
	                 flag=false;
	             }
            },executorService);


        completableFutureList.add(future);
        }
        CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);
        CompletableFuture.allOf(completableFutures).join();
       logger.info("任務(wù)結(jié)束");
        executorService.shutdown();

上面代碼會(huì)有一個(gè)問(wèn)題,就是while循環(huán)往線程池里扔任務(wù),所有線程在執(zhí)行時(shí),會(huì)在請(qǐng)求數(shù)據(jù)那里”停留“一段時(shí)間,“停留期間”還會(huì)一直循環(huán)向線程池扔任務(wù),當(dāng)線程執(zhí)行完某次請(qǐng)求得到空數(shù)據(jù)結(jié)束循環(huán)時(shí),等待隊(duì)列中還排著大堆任務(wù)等著去請(qǐng)求數(shù)據(jù)。

為了解決這個(gè)問(wèn)題,我改用了for循環(huán)提交任務(wù),提前根據(jù)請(qǐng)求數(shù)據(jù)總量、每次讀取的條數(shù),以及服務(wù)總數(shù)得到每個(gè)服務(wù)需要執(zhí)行的任務(wù)數(shù)。
第二版代碼

@XxlJob(value = "fenpian")
    public void fenpian() {
        int shardIndex = XxlJobHelper.getShardIndex()+1;
        int shardTotal = XxlJobHelper.getShardTotal();
        //分片總數(shù)
//        int shardTotal = 4;
        AtomicInteger pageNum = new AtomicInteger(shardIndex);
        //多線程情況下
        List<CompletableFuture>completableFutureList=new ArrayList<>();
        //總條數(shù)
        double total = 10000000;
        //讀取的條數(shù)
        double pageSize=1000;
        double tasks = Math.ceil( total / (double) shardTotal / pageSize);
        logger.info("任務(wù)數(shù){}",tasks);
        for(double i=0;i<tasks;i++){
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                String url = remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal);
                logger.info("url:{},threadName:{}",url,Thread.currentThread().getName());
                String body = HttpUtil.get(url);
                JSONObject jsonObject = new JSONObject();
                jsonObject = JSONUtil.parseObj(body);
                List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
                logger.info("tests的size:{}",tests.size());
            },executorService);
        completableFutureList.add(future);
        }
        CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);
        CompletableFuture.allOf(completableFutures).join();
       logger.info("任務(wù)結(jié)束");

如有問(wèn)題,請(qǐng)求指正(^^ゞ文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-651554.html

到了這里,關(guān)于【并發(fā)編程】自研數(shù)據(jù)同步工具的優(yōu)化:創(chuàng)建線程池多線程異步去分頁(yè)調(diào)用其他服務(wù)接口獲取海量數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【并發(fā)編程】SpringBoot創(chuàng)建線程池的六種方式

    1. 自定義線程池 1.1 示例代碼 ??控制臺(tái)打?。?2. 固定長(zhǎng)度線程池 2.1 示例代碼 ??控制臺(tái)打印: ??前3個(gè)任務(wù)被同時(shí)執(zhí)行,因?yàn)閯偤糜?個(gè)核心線程。后2個(gè)任務(wù)會(huì)被存放到阻塞隊(duì)列,當(dāng)執(zhí)行前3個(gè)任務(wù)的某個(gè)線程空閑時(shí)會(huì)從隊(duì)列中獲取任務(wù)并執(zhí)行。 2.2 源碼剖析 ??該類型

    2024年02月16日
    瀏覽(27)
  • Java并發(fā)編程學(xué)習(xí)筆記(一)線程的入門(mén)與創(chuàng)建

    Java并發(fā)編程學(xué)習(xí)筆記(一)線程的入門(mén)與創(chuàng)建

    認(rèn)識(shí) 程序由指令和數(shù)據(jù)組成,簡(jiǎn)單來(lái)說(shuō),進(jìn)程可以視為程序的一個(gè)實(shí)例 大部分程序可以同時(shí)運(yùn)行多個(gè)實(shí)例進(jìn)程,例如記事本、畫(huà)圖、瀏覽器等 少部分程序只能同時(shí)運(yùn)行一個(gè)實(shí)例進(jìn)程,例如QQ音樂(lè)、網(wǎng)易云音樂(lè)等 一個(gè)進(jìn)程可以分為多個(gè)線程,線程為最小調(diào)度單位,進(jìn)程則是作

    2024年02月16日
    瀏覽(48)
  • JUC并發(fā)編程-集合不安全情況以及Callable線程創(chuàng)建方式

    JUC并發(fā)編程-集合不安全情況以及Callable線程創(chuàng)建方式

    1)List 不安全 ArrayList 在并發(fā)情況下是不安全的 解決方案 : 1.Vector 2.Collections.synchonizedList() 3. CopyOnWriteArrayList 核心思想 是,如果有 多個(gè)調(diào)用者(Callers)同時(shí)要求相同的資源 (如內(nèi)存或者是磁盤(pán)上的數(shù)據(jù)存儲(chǔ)),他們 會(huì)共同獲取相同的指針指向相同的資源 , 直到某個(gè)調(diào)用者

    2024年01月23日
    瀏覽(39)
  • 大家都說(shuō)Java有三種創(chuàng)建線程的方式!并發(fā)編程中的驚天騙局!

    大家都說(shuō)Java有三種創(chuàng)建線程的方式!并發(fā)編程中的驚天騙局!

    在Java中,創(chuàng)建線程是一項(xiàng)非常重要的任務(wù)。線程是一種輕量級(jí)的子進(jìn)程,可以并行執(zhí)行,使得程序的執(zhí)行效率得到提高。Java提供了多種方式來(lái)創(chuàng)建線程,但許多人都認(rèn)為Java有三種創(chuàng)建線程的方式,它們分別是 繼承Thread類、實(shí)現(xiàn)Runnable接口和使用線程池。 但是,你們知道嗎?

    2024年02月08日
    瀏覽(25)
  • 《C++并發(fā)編程實(shí)戰(zhàn)》讀書(shū)筆記(2):線程間共享數(shù)據(jù)

    在C++中,我們通過(guò)構(gòu)造 std::mutex 的實(shí)例來(lái)創(chuàng)建互斥量,調(diào)用成員函數(shù) lock() 對(duì)其加鎖,調(diào)用 unlock() 解鎖。但通常更推薦的做法是使用標(biāo)準(zhǔn)庫(kù)提供的類模板 std::lock_guard ,它針對(duì)互斥量實(shí)現(xiàn)了RAII手法:在構(gòu)造時(shí)給互斥量加鎖,析構(gòu)時(shí)解鎖。兩個(gè)類都在頭文件 mutex 里聲明。 假設(shè)

    2024年02月10日
    瀏覽(11)
  • c++并發(fā)編程實(shí)戰(zhàn)-第3章 在線程間共享數(shù)據(jù)

    多線程之間共享數(shù)據(jù),最大的問(wèn)題便是數(shù)據(jù)競(jìng)爭(zhēng)導(dǎo)致的異常問(wèn)題。多個(gè)線程操作同一塊資源,如果不做任何限制,那么一定會(huì)發(fā)生錯(cuò)誤。例如: 輸出: 顯然,上面的輸出結(jié)果存在問(wèn)題。出現(xiàn)錯(cuò)誤的原因可能是: 某一時(shí)刻, th1線程獲得CPU時(shí)間片,將g_nResource從100增加至200后時(shí)

    2024年02月08日
    瀏覽(14)
  • 【Linux系統(tǒng)編程:線程】 線程控制 -- 創(chuàng)建、終止、等待、分離 | 線程互斥與同步 | 互斥量與條件變量 | 生產(chǎn)者消費(fèi)者模型 | 線程池 | STL/智能指針與線程安全 | 讀者寫(xiě)者模型

    【Linux系統(tǒng)編程:線程】 線程控制 -- 創(chuàng)建、終止、等待、分離 | 線程互斥與同步 | 互斥量與條件變量 | 生產(chǎn)者消費(fèi)者模型 | 線程池 | STL/智能指針與線程安全 | 讀者寫(xiě)者模型

    寫(xiě)在前面 本文重點(diǎn): 了解線程概念,理解線程與進(jìn)程區(qū)別與聯(lián)系。 學(xué)會(huì)線程控制,線程創(chuàng)建,線程終止,線程等待。 了解線程分離與線程安全。 學(xué)會(huì)線程同步。 學(xué)會(huì)使用互斥量,條件變量,posix 信號(hào)量,以及讀寫(xiě)鎖。 理解基于讀寫(xiě)鎖的讀者寫(xiě)者問(wèn)題。 一、線程概念 ??

    2024年02月04日
    瀏覽(104)
  • 【并發(fā)編程】線程池多線程異步去分頁(yè)調(diào)用其他服務(wù)接口獲取海量數(shù)據(jù)

    前段時(shí)間在做一個(gè)數(shù)據(jù)同步工具,其中一個(gè)服務(wù)的任務(wù)是調(diào)用A服務(wù)的接口,將數(shù)據(jù)庫(kù)中指定數(shù)據(jù)請(qǐng)求過(guò)來(lái),交給kafka去判斷哪些數(shù)據(jù)是需要新增,哪些數(shù)據(jù)是需要修改的。 剛開(kāi)始的設(shè)計(jì)思路是,,我創(chuàng)建多個(gè)服務(wù)同時(shí)去請(qǐng)求A服務(wù)的接口,每個(gè)服務(wù)都請(qǐng)求到全量數(shù)據(jù),由于這些

    2024年02月13日
    瀏覽(32)
  • 基于多線程并發(fā)-線程同步-系統(tǒng)實(shí)現(xiàn)

    系統(tǒng)實(shí)現(xiàn):相對(duì)于STL來(lái)說(shuō)非標(biāo)準(zhǔn)的實(shí)現(xiàn),Linux和Windows平臺(tái)各自的實(shí)現(xiàn)。 線程同步:通過(guò)限制多個(gè)線程同時(shí)執(zhí)行某段代碼來(lái)保護(hù)資源的。 1、線程互斥量 pthread_mutex_t 的初始化 a、定義再初始化: pthread_mutex_init函數(shù)的第二個(gè)參數(shù)attr是定義互斥鎖的屬性,一般為NULL。成功初始化返

    2024年02月08日
    瀏覽(23)
  • Python異步編程之web框架 異步vs同步 數(shù)據(jù)庫(kù)IO任務(wù)并發(fā)支持對(duì)比

    Python異步編程之web框架 異步vs同步 數(shù)據(jù)庫(kù)IO任務(wù)并發(fā)支持對(duì)比

    主題: 比較異步框架和同步框架在數(shù)據(jù)庫(kù)IO操作的性能差異 python版本 :python 3.8 數(shù)據(jù)庫(kù) :mysql 8.0.27 (docker部署) 壓測(cè)工具 :locust web框架 :同步:flask 異步:starlette 請(qǐng)求并發(fā)量 : 模擬10個(gè)用戶 服務(wù)器配置 : Intel(R) i7-12700F 客戶端配置 :Intel(R) i7-8700 3.20GHz python中操作數(shù)據(jù)庫(kù)通常

    2024年02月08日
    瀏覽(31)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包