国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

多線程批量同步數(shù)據(jù)到ES

這篇具有很好參考價(jià)值的文章主要介紹了多線程批量同步數(shù)據(jù)到ES。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

需求背景:新增了ES,現(xiàn)在要講數(shù)據(jù)庫(kù)某張表的數(shù)據(jù)同步到ES中,百萬(wàn)級(jí)的數(shù)據(jù)量一次性讀取同步肯定不行,所以可以用多線程同步執(zhí)行同步數(shù)據(jù)。

1.線程池配置類

@Configuration
public class ThreadPoolConfig {

    /**
     * 核心線程池大小
     */
    private static final int CORE_POOL_SIZE = 17;

    /**
     * 最大可創(chuàng)建的線程數(shù)
     */
    private static final int MAX_POOL_SIZE = 50;

    /**
     * 隊(duì)列最大長(zhǎng)度
     */
    private static final int QUEUE_CAPACITY = 1000;

    /**
     * 線程池維護(hù)線程所允許的空閑時(shí)間
     */
    private static final int KEEP_ALIVE_SECONDS = 500;

    @Bean("taskExecutor")
    public ExecutorService executorService(){
        //使用原子類,保證線程命名的唯一性和連續(xù)性
        AtomicInteger c = new AtomicInteger(1);
        //創(chuàng)建鏈表結(jié)構(gòu)的阻塞隊(duì)列
        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(QUEUE_CAPACITY);
        return new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_SECONDS,
                TimeUnit.MILLISECONDS,
                queue,
                r -> new Thread(r, "es-pool-" + c.getAndIncrement()),
                new ThreadPoolExecutor.DiscardPolicy()
        );
    }
}

2.ES配置類

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {
    private String host;
    private int port;

    @Bean
    public RestHighLevelClient client(){
        return new RestHighLevelClient(RestClient.builder(
                new HttpHost(
                        host,
                        port,
                        "http"
                )
        ));
    }
}

3.主要代碼邏輯文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-821558.html

@Service
@Transactional
@Slf4j
public class TestService{
    @Autowired
    private TestMapper testMapper;
    @Autowired
    private RestHighLevelClient client; //ES客戶端
    @Autowired
    private ExecutorService executorService; //線程池

    private static final String ARTICLE_ES_INDEX = "test_info";//ES索引庫(kù)名稱

    private static final int PAGE_SIZE = 5000; //每頁(yè)記錄數(shù)

    /**
     * 批量導(dǎo)入邏輯
     */
    public void importAll() {
        //查詢數(shù)據(jù)總數(shù)
        int count = testMapper.selectCount();
        //總頁(yè)數(shù)用 數(shù)據(jù)庫(kù)數(shù)據(jù)總數(shù)%每頁(yè)記錄數(shù)
        int totalPageSize = count % PAGE_SIZE == 0 ? count / PAGE_SIZE : count / PAGE_SIZE + 1;
        //記錄開(kāi)始執(zhí)行時(shí)間
        long startTime = System.currentTimeMillis();
        //一共有多少頁(yè),就創(chuàng)建多少個(gè)CountDownLatch的計(jì)數(shù)
        CountDownLatch countDownLatch = new CountDownLatch(totalPageSize);

        int fromIndex;
        List<TestVo> testVoList= null;
        for (int i = 0; i < totalPageSize; i++) {
            //起始分頁(yè)條數(shù)
            fromIndex = i * PAGE_SIZE;
            //查詢數(shù)據(jù)庫(kù)當(dāng)前頁(yè)數(shù)的數(shù)據(jù)  SELECT*FROM 表名 LIMIT fromIndex,PAGE_SIZE
            testVoList= testMapper.selectCurrentData(fromIndex, PAGE_SIZE);
            //創(chuàng)建線程,做批量插入es數(shù)據(jù)操作
            TaskThread taskThread = new TaskThread(testVoList, countDownLatch);
            //把當(dāng)前線程任務(wù)交由線程池執(zhí)行
            executorService.execute(taskThread);
        }
        //調(diào)用await()方法,用來(lái)等待計(jì)數(shù)歸零
        countDownLatch.await();
        long endTime = System.currentTimeMillis();
        log.info("es索引數(shù)據(jù)批量導(dǎo)入共:{}條,共消耗時(shí)間:{}秒", count, (endTime - startTime) / 1000);
    }


    //這里為了方便,寫了線程內(nèi)部類。
    class TaskThread implements Runnable {
        List<TestVo> testVoList;
        CountDownLatch cdl;
		//數(shù)據(jù)和倒計(jì)時(shí)鎖
        public TaskThread(List<TestVo> testVoList, CountDownLatch cdl) {
            this.articleList = articleList;
            this.cdl = cdl;
        }
        @Override
        public void run() {
            //創(chuàng)建ES對(duì)象,并指定名稱
            BulkRequest bulkRequest = new BulkRequest(ARTICLE_ES_INDEX);
            for (SearchArticleVo searchArticleVo : articleList) {
                //存儲(chǔ)到ES
                bulkRequest.add(new IndexRequest().id(searchArticleVo.getId().toString())
                        .source(JSON.toJSONString(testVoList), XContentType.JSON));
            }
            //發(fā)送請(qǐng)求,批量添加數(shù)據(jù)到es索引庫(kù)中
            client.bulk(bulkRequest, RequestOptions.DEFAULT);
            //添加成功后計(jì)數(shù)減一
            cdl.countDown();
        }}}

到了這里,關(guān)于多線程批量同步數(shù)據(jù)到ES的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包