需求背景:新增了ES,現(xiàn)在要講數(shù)據(jù)庫(kù)某張表的數(shù)據(jù)同步到ES中,百萬(wàn)級(jí)的數(shù)據(jù)量一次性讀取同步肯定不行,所以可以用多線程同步執(zhí)行同步數(shù)據(jù)。
1.線程池配置類
@Configuration
public class ThreadPoolConfig {
/**
* 核心線程池大小
*/
private static final int CORE_POOL_SIZE = 17;
/**
* 最大可創(chuàng)建的線程數(shù)
*/
private static final int MAX_POOL_SIZE = 50;
/**
* 隊(duì)列最大長(zhǎng)度
*/
private static final int QUEUE_CAPACITY = 1000;
/**
* 線程池維護(hù)線程所允許的空閑時(shí)間
*/
private static final int KEEP_ALIVE_SECONDS = 500;
@Bean("taskExecutor")
public ExecutorService executorService(){
//使用原子類,保證線程命名的唯一性和連續(xù)性
AtomicInteger c = new AtomicInteger(1);
//創(chuàng)建鏈表結(jié)構(gòu)的阻塞隊(duì)列
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(QUEUE_CAPACITY);
return new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_SECONDS,
TimeUnit.MILLISECONDS,
queue,
r -> new Thread(r, "es-pool-" + c.getAndIncrement()),
new ThreadPoolExecutor.DiscardPolicy()
);
}
}
2.ES配置類文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-821558.html
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {
private String host;
private int port;
@Bean
public RestHighLevelClient client(){
return new RestHighLevelClient(RestClient.builder(
new HttpHost(
host,
port,
"http"
)
));
}
}
3.主要代碼邏輯文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-821558.html
@Service
@Transactional
@Slf4j
public class TestService{
@Autowired
private TestMapper testMapper;
@Autowired
private RestHighLevelClient client; //ES客戶端
@Autowired
private ExecutorService executorService; //線程池
private static final String ARTICLE_ES_INDEX = "test_info";//ES索引庫(kù)名稱
private static final int PAGE_SIZE = 5000; //每頁(yè)記錄數(shù)
/**
* 批量導(dǎo)入邏輯
*/
public void importAll() {
//查詢數(shù)據(jù)總數(shù)
int count = testMapper.selectCount();
//總頁(yè)數(shù)用 數(shù)據(jù)庫(kù)數(shù)據(jù)總數(shù)%每頁(yè)記錄數(shù)
int totalPageSize = count % PAGE_SIZE == 0 ? count / PAGE_SIZE : count / PAGE_SIZE + 1;
//記錄開(kāi)始執(zhí)行時(shí)間
long startTime = System.currentTimeMillis();
//一共有多少頁(yè),就創(chuàng)建多少個(gè)CountDownLatch的計(jì)數(shù)
CountDownLatch countDownLatch = new CountDownLatch(totalPageSize);
int fromIndex;
List<TestVo> testVoList= null;
for (int i = 0; i < totalPageSize; i++) {
//起始分頁(yè)條數(shù)
fromIndex = i * PAGE_SIZE;
//查詢數(shù)據(jù)庫(kù)當(dāng)前頁(yè)數(shù)的數(shù)據(jù) SELECT*FROM 表名 LIMIT fromIndex,PAGE_SIZE
testVoList= testMapper.selectCurrentData(fromIndex, PAGE_SIZE);
//創(chuàng)建線程,做批量插入es數(shù)據(jù)操作
TaskThread taskThread = new TaskThread(testVoList, countDownLatch);
//把當(dāng)前線程任務(wù)交由線程池執(zhí)行
executorService.execute(taskThread);
}
//調(diào)用await()方法,用來(lái)等待計(jì)數(shù)歸零
countDownLatch.await();
long endTime = System.currentTimeMillis();
log.info("es索引數(shù)據(jù)批量導(dǎo)入共:{}條,共消耗時(shí)間:{}秒", count, (endTime - startTime) / 1000);
}
//這里為了方便,寫了線程內(nèi)部類。
class TaskThread implements Runnable {
List<TestVo> testVoList;
CountDownLatch cdl;
//數(shù)據(jù)和倒計(jì)時(shí)鎖
public TaskThread(List<TestVo> testVoList, CountDownLatch cdl) {
this.articleList = articleList;
this.cdl = cdl;
}
@Override
public void run() {
//創(chuàng)建ES對(duì)象,并指定名稱
BulkRequest bulkRequest = new BulkRequest(ARTICLE_ES_INDEX);
for (SearchArticleVo searchArticleVo : articleList) {
//存儲(chǔ)到ES
bulkRequest.add(new IndexRequest().id(searchArticleVo.getId().toString())
.source(JSON.toJSONString(testVoList), XContentType.JSON));
}
//發(fā)送請(qǐng)求,批量添加數(shù)據(jù)到es索引庫(kù)中
client.bulk(bulkRequest, RequestOptions.DEFAULT);
//添加成功后計(jì)數(shù)減一
cdl.countDown();
}}}
到了這里,關(guān)于多線程批量同步數(shù)據(jù)到ES的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!