国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

(線程池)多線程使用場景--es數(shù)據(jù)批量導入、數(shù)據(jù)匯總、異步調(diào)用;如何控制某個方法允許并發(fā)訪問線程的數(shù)量;對ThreadLocal的理解及實現(xiàn)原理、源碼解析、ThreadLocal的內(nèi)存泄露問題

這篇具有很好參考價值的文章主要介紹了(線程池)多線程使用場景--es數(shù)據(jù)批量導入、數(shù)據(jù)匯總、異步調(diào)用;如何控制某個方法允許并發(fā)訪問線程的數(shù)量;對ThreadLocal的理解及實現(xiàn)原理、源碼解析、ThreadLocal的內(nèi)存泄露問題。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

線程池使用場景(CountDownLatch、Future)

CountDownLatch(閉鎖/倒計時鎖) 用來進行線程同步協(xié)作,等待所有線程完成倒計時(一個或者多個線程,等待其他多個線程完成某件事情之后才能執(zhí)行)

  • 其中構(gòu)造參數(shù)用來初始化等待計數(shù)值

  • await() 用來等待計數(shù)歸零

  • countDown() 用來讓計數(shù) 減一
    分布式es數(shù)據(jù)批量導入場景,java,數(shù)據(jù)庫,hash,spring boot,spring cloud

public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        //初始化了一個倒計時鎖 參數(shù)為 3
        CountDownLatch latch = new CountDownLatch(3);

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"-begin...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            //count--
            latch.countDown();
            System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
        }).start();
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"-begin...");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            //count--
            latch.countDown();
            System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
        }).start();
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"-begin...");
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            //count--
            latch.countDown();
            System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
        }).start();
        String name = Thread.currentThread().getName();
        System.out.println(name + "-waiting...");
        //等待其他線程完成
        latch.await();
        System.out.println(name + "-wait end...");
    }
    
}

多線程使用場景一:( es數(shù)據(jù)批量導入)

在我們項目上線之前,我們需要把數(shù)據(jù)庫中的數(shù)據(jù)一次性的同步到es索引庫中,但是當時的數(shù)據(jù)好像是1000萬左右,一次性讀取數(shù)據(jù)肯定不行(oom異常,內(nèi)存溢出),當時我就想到可以使用線程池的方式導入,利用CountDownLatch來控制,就能避免一次性加載過多,防止內(nèi)存溢出。

整體流程就是通過CountDownLatch+線程池配合去執(zhí)行

分布式es數(shù)據(jù)批量導入場景,java,數(shù)據(jù)庫,hash,spring boot,spring cloud
分布式es數(shù)據(jù)批量導入場景,java,數(shù)據(jù)庫,hash,spring boot,spring cloud

多線程使用場景二(數(shù)據(jù)匯總)

在一個電商網(wǎng)站中,用戶下單之后,需要查詢數(shù)據(jù),數(shù)據(jù)包含了三部分:訂單信息、包含的商品、物流信息;這三塊信息都在不同的微服務(wù)中進行實現(xiàn)的,我們?nèi)绾瓮瓿蛇@個業(yè)務(wù)呢?
分布式es數(shù)據(jù)批量導入場景,java,數(shù)據(jù)庫,hash,spring boot,spring cloud
在實際開發(fā)的過程中,難免需要調(diào)用多個接口來匯總數(shù)據(jù),如果所有接口(或部分接口)的沒有依賴關(guān)系,就可以使用線程池+future來提升性能

多線程使用場景三(異步調(diào)用)

分布式es數(shù)據(jù)批量導入場景,java,數(shù)據(jù)庫,hash,spring boot,spring cloud
在進行搜索的時候,需要保存用戶的搜索記錄,而搜索記錄不能影響用戶的正常搜索,我們通常會開啟一個線程去執(zhí)行歷史記錄的保存,在新開啟的線程在執(zhí)行的過程中,可以利用線程提交任務(wù)。

多線程使用的場景總結(jié):

  • **批量導入:**使用了線程池+CountDownLatch批量把數(shù)據(jù)庫中的數(shù)據(jù)導入到了ES(任意)中,避免OOM
  • **數(shù)據(jù)匯總:**調(diào)用多個接口來匯總數(shù)據(jù),如果所有接口(或部分接口)的沒有依賴關(guān)系,就可以使用線程池+future來提升性能
  • **異步線程(線程池)︰**為了避免下一級方法影響上一級方法(性能考慮),可使用異步線程調(diào)用下一個方法(不需要下一級方法返回值),可以提升方法響應(yīng)時間

如何控制某個方法允許并發(fā)訪問線程的數(shù)量

Semaphore [?s?m??f?r] 信號量,是JUC包下的一個工具類,我們可以通過其限制執(zhí)行的線程數(shù)量,達到限流的效果。
當一個線程執(zhí)行時先通過其方法進行獲取許可操作,獲取到許可的線程繼續(xù)執(zhí)行業(yè)務(wù)邏輯,當線程執(zhí)行完成后進行釋放許可操作,未獲取達到許可的線程進行等待或者直接結(jié)束。

Semaphore兩個重要的方法

  • lsemaphore.acquire(): 請求一個信號量,這時候的信號量個數(shù)-1(一旦沒有可使用的信號量,也即信號量個數(shù)變?yōu)樨摂?shù)時,再次請求的時候就會阻塞,直到其他線程釋放了信號量)

  • lsemaphore.release(): 釋放一個信號量,此時信號量個數(shù)+1

線程任務(wù)類:

public class SemaphoreCase {
    public static void main(String[] args) {
        // 1. 創(chuàng)建 semaphore 對象
        Semaphore semaphore = new Semaphore(3);
        // 2. 10個線程同時運行
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {

                try {
                    // 3. 獲取許可
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println("running...");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("end...");
                } finally {
                    // 4. 釋放許可
                    semaphore.release();
                }
            }).start();
        }
    }

}

對ThreadLocal的理解

ThreadLocal是多線程中對于解決線程安全的一個操作類,它會為每個線程都分配一個獨立的線程副本從而解決了變量并發(fā)訪問沖突的問題。ThreadLocal 同時實現(xiàn)了線程內(nèi)的資源共享

案例:使用JDBC操作數(shù)據(jù)庫時,會將每一個線程的Connection放入各自的ThreadLocal中,從而保證每個線程都在各自的 Connection 上進行數(shù)據(jù)庫的操作,避免A線程關(guān)閉了B線程的連接。
分布式es數(shù)據(jù)批量導入場景,java,數(shù)據(jù)庫,hash,spring boot,spring cloud
三個主要方法:

  • set(value) 設(shè)置值

  • get() 獲取值

  • remove() 清除值

public class ThreadLocalTest {
    static ThreadLocal<String> threadLocal = new ThreadLocal<>();

    public static void main(String[] args) {
        new Thread(() -> {
            String name = Thread.currentThread().getName();
            threadLocal.set("itcast");
            print(name);
            System.out.println(name + "-after remove : " + threadLocal.get());
        }, "t1").start();
        new Thread(() -> {
            String name = Thread.currentThread().getName();
            threadLocal.set("itheima");
            print(name);
            System.out.println(name + "-after remove : " + threadLocal.get());
        }, "t2").start();
    }

    static void print(String str) {
        //打印當前線程中本地內(nèi)存中本地變量的值
        System.out.println(str + " :" + threadLocal.get());
        //清除本地內(nèi)存中的本地變量
        threadLocal.remove();
    }

}

ThreadLocal的實現(xiàn)原理&源碼解析

ThreadLocal本質(zhì)來說就是一個線程內(nèi)部存儲類,從而讓多個線程只操作自己內(nèi)部的值,從而實現(xiàn)線程數(shù)據(jù)隔離
分布式es數(shù)據(jù)批量導入場景,java,數(shù)據(jù)庫,hash,spring boot,spring cloud
在ThreadLocal中有一個內(nèi)部類叫做ThreadLocalMap,類似于HashMap

ThreadLocalMap中有一個屬性 table數(shù)組 ,這個是真正 存儲數(shù)據(jù) 的位置

set方法
分布式es數(shù)據(jù)批量導入場景,java,數(shù)據(jù)庫,hash,spring boot,spring cloud
get方法/remove方法
分布式es數(shù)據(jù)批量導入場景,java,數(shù)據(jù)庫,hash,spring boot,spring cloud

ThreadLocal的內(nèi)存泄露問題

Java對象中的四種引用類型:強引用、軟引用、弱引用、虛引用

  • 強引用:最為普通的引用方式,表示一個對象處于有用且必須的狀態(tài),如果一個對象具有強引用,則GC并不會回收它。即便堆中內(nèi)存不足了,寧可出現(xiàn)OOM,也不會對其進行回收
User user = new User();
  • 弱引用:表示一個對象處于可能有用且非必須的狀態(tài)。在GC線程掃描內(nèi)存區(qū)域時,一旦發(fā)現(xiàn)弱引用,就會回收到弱引用相關(guān)聯(lián)的對象。對于弱引用的回收,無關(guān)內(nèi)存區(qū)域是否足夠,一旦發(fā)現(xiàn)則會被回收
User user = new User();
WeakReference weakReference = new WeakReference(user);

每一個Thread維護一個ThreadLocalMap,在ThreadLocalMap中的Entry對象繼承WeakReference。其中key為使用弱引用的ThreadLocal實例,value為線程變量的副本。
分布式es數(shù)據(jù)批量導入場景,java,數(shù)據(jù)庫,hash,spring boot,spring cloud

在使用ThreadLocal的時候,強烈建議:務(wù)必手動remove文章來源地址http://www.zghlxwxcb.cn/news/detail-857700.html

到了這里,關(guān)于(線程池)多線程使用場景--es數(shù)據(jù)批量導入、數(shù)據(jù)匯總、異步調(diào)用;如何控制某個方法允許并發(fā)訪問線程的數(shù)量;對ThreadLocal的理解及實現(xiàn)原理、源碼解析、ThreadLocal的內(nèi)存泄露問題的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 【SpringBoot】springboot數(shù)據(jù)使用多線程批量入數(shù)據(jù)庫

    springboot、mybatisPlus、mysql8 mysql8(部署在1核2G的服務(wù)器上,很卡,所以下面的數(shù)據(jù)條數(shù)用5000,太大怕不是要等到花兒都謝了 0.0) 共耗時:180121 ms 耗時時間:87217ms 耗時時間: 28235 可見時間從180秒,縮短到了28秒,但是@Transactional對于多線程是控制不了所有的事務(wù)的。 Spring實現(xiàn)

    2024年02月02日
    瀏覽(24)
  • StarRocks案例7:使用shell批量broker load導入hdfs數(shù)據(jù)

    近期需要進行補錄數(shù)據(jù),需要將hive的歷史數(shù)據(jù)遷移到StarRocks,因為需要補錄的數(shù)據(jù)較多,hive和StarRocks均使用的是分區(qū)表,兩邊的日期格式也不同,hive這邊是 yyyymmdd格式,StarRocks這邊是yyyy-mm-dd格式。 之前一直是使用DataX來從hive導入到StarRocks,因為DataX是單節(jié)點的,而hive和S

    2024年02月11日
    瀏覽(15)
  • 補充:es與mysql之間的數(shù)據(jù)同步 2 使用分頁導入的方式把大量數(shù)據(jù)從mysql導入es

    本片文章只是對之前寫的文章的補充, es與mysql之間的數(shù)據(jù)同步 http://t.csdn.cn/npHt4 補充一: 之前的文章對于交換機、隊列、綁定,使用的是@bean, 而這里使用的是純注解版 在消費方,聲明交換機: 補充二: 之前的文章是直接使用es操作數(shù)據(jù),新增和修改,這樣做不是很合適

    2024年02月12日
    瀏覽(24)
  • 【二十四】springboot使用EasyExcel和線程池實現(xiàn)多線程導入Excel數(shù)據(jù)

    【二十四】springboot使用EasyExcel和線程池實現(xiàn)多線程導入Excel數(shù)據(jù)

    ??springboot篇章整體欄目:? 【一】springboot整合swagger(超詳細 【二】springboot整合swagger(自定義)(超詳細) 【三】springboot整合token(超詳細) 【四】springboot整合mybatis-plus(超詳細)(上) 【五】springboot整合mybatis-plus(超詳細)(下) 【六】springboot整合自定義全局異常

    2023年04月08日
    瀏覽(23)
  • 使用elasticdump實現(xiàn)es數(shù)據(jù)導入導出示例(持續(xù)更新中)

    Elasticdump是一個命令行工具,可用于將數(shù)據(jù)從Elasticsearch導出到JSON文件,以及將JSON文件導入到Elasticsearch中。以下是一個簡單的示例,演示如何使用Elasticdump實現(xiàn)數(shù)據(jù)導入導出: 您可以使用npm命令在命令行中安裝Elasticdump。(npm請自行安裝。)例如,使用以下命令安裝最新版本

    2023年04月11日
    瀏覽(25)
  • java中多線程去跑海量數(shù)據(jù)使用線程池批量ThreadPoolExecutor處理的方式和使用Fork/Join框架的方式那種效率高?

    在Java中,使用線程池(ThreadPoolExecutor)和使用Fork/Join框架來處理海量數(shù)據(jù)的效率取決于具體的應(yīng)用場景和需求。下面是一些需要考慮的因素: 任務(wù)類型:如果任務(wù)是CPU密集型的,那么使用Fork/Join框架可能更高效,因為它可以自動進行任務(wù)分割和并行處理。如果任務(wù)是I/O密集

    2024年02月10日
    瀏覽(21)
  • 使用Logstash和JDBC將MySQL的數(shù)據(jù)導入到Elasticsearch(ES)的過程

    使用Logstash和JDBC將MySQL的數(shù)據(jù)導入到Elasticsearch(ES)的過程包含多個步驟。請注意,首先你需要準備好的JDBC驅(qū)動,Logstash實例,Elasticsearch實例,以及你希望導入的MySQL數(shù)據(jù)。 安裝Logstash JDBC Input Plugin :Logstash包含大量插件,其中一個就是JDBC Input Plugin,可以用于從JDBC兼容的數(shù)據(jù)庫

    2024年02月15日
    瀏覽(34)
  • 使用分頁導入的方式把大量數(shù)據(jù)從mysql導入單點的es時報錯:Connection refused: no further information

    使用分頁導入的方式把大量數(shù)據(jù)從mysql導入單點的es時報錯:Connection refused: no further information

    我出現(xiàn)的問題: 意思是, 拒絕連接:沒有進一步的信息 我的解決方案是:在yml文件中配置以下信息,問題就可以解決 但是,我水品有限,沒有明白什么原因,還有這個配置文件中的內(nèi)容也不是很清楚,如果有路過的大佬,原因耽誤寶貴的時間,給小弟解釋一下,小弟不勝感

    2024年02月11日
    瀏覽(37)
  • Elasticsearch 批量導入數(shù)據(jù)

    **Elasticsearch**是一款非常高效的全文檢索引擎。 **Elasticsearch**可以非常方便地進行數(shù)據(jù)的多維分析,所以大數(shù)據(jù)分析領(lǐng)域也經(jīng)常會見到它的身影,生產(chǎn)環(huán)境中絕大部分新產(chǎn)生的數(shù)據(jù)可以通過應(yīng)用直接導入,但是歷史或初始數(shù)據(jù)可能會需要單獨處理,這種情況下可能遇到需要導

    2023年04月11日
    瀏覽(18)
  • 【大數(shù)據(jù)】Hive 中的批量數(shù)據(jù)導入

    在博客【大數(shù)據(jù)】Hive 表中插入多條數(shù)據(jù) 中,我簡單介紹了幾種向 Hive 表中插入數(shù)據(jù)的方法。然而更多的時候,我們并不是一條數(shù)據(jù)一條數(shù)據(jù)的插入,而是以批量導入的方式。在本文中,我將較為全面地介紹幾種向 Hive 中批量導入數(shù)據(jù)的方法。 overwrite :表示覆蓋表中已有數(shù)

    2024年02月11日
    瀏覽(18)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包