1 需求
在項(xiàng)目開(kāi)發(fā)中需要處理100萬(wàn)多的數(shù)據(jù),這些數(shù)據(jù)需要從mysql數(shù)據(jù)庫(kù)中讀取出來(lái),再通過(guò)調(diào)用其他平臺(tái)的接口推送數(shù)據(jù)。由于時(shí)間緊迫,數(shù)據(jù)需要在短時(shí)間內(nèi)完成推送,采用單線程推送很慢,所以采用多線程推送來(lái)提高效率。
2 配置多線程
2.1 application.yml
thread-pool:
core-pool-size: 4
max-pool-size: 16
queue-capacity: 80
keep-alive-seconds: 120
2.2 創(chuàng)建ThreadPoolProperties
import lombok.Data;
import org.springframework.stereotype.Component;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@Component
@ConfigurationProperties(prefix = "thread-pool")
public class ThreadPoolProperties {
/**
* 線程池創(chuàng)建時(shí)候初始化的線程數(shù)
*/
private int corePoolSize;
/**
* 線程池最大的線程數(shù),只有在緩沖隊(duì)列滿了之后才會(huì)申請(qǐng)超過(guò)核心線程數(shù)的線程
*/
private int maxPoolSize;
/**
* 用來(lái)緩沖執(zhí)行任務(wù)的隊(duì)列
*/
private int queueCapacity;
/**
* 允許線程的空閑時(shí)間:當(dāng)超過(guò)了核心線程出之外的線程在空閑時(shí)間到達(dá)之后會(huì)被銷毀
*/
private int keepAliveSeconds;
}
2.3 創(chuàng)建ThreadPoolConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@EnableAsync
@Configuration
public class ThreadPoolConfig {
private final ThreadPoolProperties threadPoolProperties;
@Autowired
public ThreadPoolConfig(ThreadPoolProperties threadPoolProperties) {
this.threadPoolProperties = threadPoolProperties;
}
@Bean(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
executor.setThreadNamePrefix("thread-pool-");
return executor;
}
}
3 多線程批量數(shù)據(jù)處理文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-835171.html
public RequestResult multiThreadPush() {
List<HistoryStudent> historyStudentList = historyStudentMapper.getList(0, 65867);
// 分割集合
List<List<HistoryStudent>> partitionData = partitionData(historyStudentList, 4);
ThreadPoolTaskExecutor executor = SpringUtil.getBean("threadPoolTaskExecutor", ThreadPoolTaskExecutor.class);
// 計(jì)數(shù)器
CountDownLatch latch = new CountDownLatch(partitionData.size());
for (List<HistoryStudent> historyStudents : partitionData) {
executor.execute(() -> {
try {
for (HistoryStudent historyStudent : historyStudents) {
// 單個(gè)數(shù)據(jù)處理
//processSingleData(historyStudent);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return RequestResult.success();
}
private List<List<HistoryStudent>> partitionData(List<HistoryStudent> dataList, int partitionSize) {
List<List<HistoryStudent>> partitions = new ArrayList<>();
int size = dataList.size();
int batchSize = size / partitionSize;
for (int i = 0; i < partitionSize; i++) {
int fromIndex = i * batchSize;
int toIndex = (i == partitionSize - 1) ? size : fromIndex + batchSize;
partitions.add(dataList.subList(fromIndex, toIndex));
}
return partitions;
}
4 參考博客
Java多線程批量處理、線程池的使用
Java多線程處理大批量數(shù)據(jù)
java多線程批量處理數(shù)據(jù)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-835171.html
到了這里,關(guān)于Java 多線程批量處理數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!