SpringBoot高效批量插入百萬(wàn)數(shù)據(jù)
前言:我相信很多小伙伴和我一樣在初學(xué)的時(shí)候,面對(duì)幾萬(wàn)幾十萬(wàn)數(shù)據(jù)插入,不知道如何下手,面對(duì)百萬(wàn)級(jí)別數(shù)據(jù)時(shí)更是不知所措,我們大部分初學(xué)者,很多人都喜歡for循環(huán)插入數(shù)據(jù),或者是開(kāi)啟多個(gè)線程,然后分批使用for循環(huán)插入,當(dāng)我們需要將大量數(shù)據(jù)存儲(chǔ)到數(shù)據(jù)庫(kù)中時(shí),傳統(tǒng)的逐條插入方式顯然效率低下,并且容易導(dǎo)致性能瓶頸。而批量插入是一種更加高效的方式,可以大幅提高數(shù)據(jù)的插入速度,特別是在數(shù)據(jù)量較大的情況下。本文將介紹如何使用 Spring Boot 實(shí)現(xiàn)高效批量插入百萬(wàn)數(shù)據(jù),以解決傳統(tǒng)逐條插入方式存在的性能問(wèn)題。我們將使用不同的插入方式來(lái)比較。
1.拋出問(wèn)題
傳統(tǒng)的單條插入存在什么問(wèn)題:
- 性能問(wèn)題:如果循環(huán)插入的數(shù)據(jù)量比較大,每次插入都需要與數(shù)據(jù)庫(kù)建立連接、執(zhí)行插入操作,這將導(dǎo)致頻繁的網(wǎng)絡(luò)通信和數(shù)據(jù)庫(kù)操作,性能會(huì)受到影響??梢钥紤]批量插入數(shù)據(jù)來(lái)提高性能,例如使用 JDBC 的批處理功能或使用框架提供的批處理方法。
- 事務(wù)問(wèn)題:默認(rèn)情況下,Spring Boot 的事務(wù)管理是基于注解的,每次循環(huán)插入數(shù)據(jù)都會(huì)開(kāi)啟一個(gè)新的事務(wù),這可能導(dǎo)致事務(wù)管理的開(kāi)銷過(guò)大。可以考慮將整個(gè)循環(huán)插入數(shù)據(jù)放在一個(gè)事務(wù)中,或者使用編程式事務(wù)管理來(lái)控制事務(wù)的粒度。
- 數(shù)據(jù)庫(kù)連接問(wèn)題:在循環(huán)過(guò)程中,如果每次都重新獲取數(shù)據(jù)庫(kù)連接,可能會(huì)導(dǎo)致連接資源的浪費(fèi)和性能下降。可以考慮使用連接池技術(shù)來(lái)管理數(shù)據(jù)庫(kù)連接,確保連接的復(fù)用和高效利用。
- 異常處理問(wèn)題:在循環(huán)插入數(shù)據(jù)時(shí),可能會(huì)出現(xiàn)插入失敗、異常等情況。需要適當(dāng)處理異常,例如記錄錯(cuò)誤日志、回滾事務(wù)等,以確保數(shù)據(jù)的完整性和一致性。
2.前期準(zhǔn)備工作
框架:springboot+mybatis plus +mysql
- 準(zhǔn)備工作
創(chuàng)建一個(gè)簡(jiǎn)單的springboot項(xiàng)目,pom依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.15</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
- 創(chuàng)建測(cè)試需要使用的表
CREATE TABLE `student` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int DEFAULT NULL,
`addr` varchar(255) DEFAULT NULL,
`addr_Num` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8497107 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
- 修改application.yml配置
server:
port: 8090
spring:
datasource:
url: jdbc:mysql://localhost:3306/boot_study?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
- 創(chuàng)建實(shí)體類StudentDO
注意:在實(shí)際業(yè)務(wù)中,我們應(yīng)該明確去定義controller service 層的數(shù)據(jù)模型,數(shù)據(jù)傳輸XxxDTO、數(shù)據(jù)表實(shí)體映射XxxDO、返回給前臺(tái)數(shù)據(jù)實(shí)體XxxVO,這些模型數(shù)據(jù)都需要根據(jù)實(shí)際情況在Service實(shí)現(xiàn)類和Controller層轉(zhuǎn)換
這里為了演示方便就不按規(guī)范定義
@TableName(value = "student")
@Data
public class StudentDO {
/** 主鍵 type:自增 */
@TableId(type = IdType.AUTO)
private Integer id;
/** 名字 */
private String name;
/** 年齡 */
private Integer age;
/** 地址 */
private String addr;
/** 地址號(hào) @TableField:與表字段映射 */
@TableField(value = "addr_num")
private String addrNum;
public StudentDO(String name, int age, String addr, String addrNum) {
this.name = name;
this.age = age;
this.addr = addr;
this.addrNum = addrNum;
}
}
- controller定義
@RestController
@RequestMapping("/student")
public class StudentController {
@Autowired
private StudentMapper studentMapper;
@Autowired
private StudentService studentService;
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Autowired
private PlatformTransactionManager transactionManager;
}
- service和impl定義
public interface StudentService extends IService<StudentDO> {
}
//實(shí)現(xiàn)定義
@Service
public class StudentServiceImpl extends ServiceImpl<StudentMapper, StudentDO> implements StudentService {
}
- Mapper定義
public interface StudentMapper extends BaseMapper<StudentDO> {
@Insert("<script>" +
"insert into student (name, age, addr, addr_num) values " +
"<foreach collection='studentDOList' item='item' separator=','> " +
"(#{item.name}, #{item.age},#{item.addr}, #{item.addrNum}) " +
"</foreach> " +
"</script>")
public int insertSplice(@Param("studentDOList") List<StudentDO> studentDOList);
}
3.測(cè)試示例演示
模擬100萬(wàn)條數(shù)據(jù)不同方式插入
- for循環(huán)單條插入(不建議)
這里100萬(wàn)數(shù)據(jù)大概要20分鐘以上,所以以10萬(wàn)條數(shù)據(jù)類推
10萬(wàn)條數(shù)據(jù)總耗時(shí)348.864秒
@GetMapping("/for")
public void insertForData () {
long start = System.currentTimeMillis();
for (int i = 0; i < 1000000 ; i++) {
StudentDO StudentDO = new StudentDO("張三"+i, i, "地址"+i, i+"號(hào)");
studentMapper.insert(StudentDO);
}
long end = System.currentTimeMillis();
System.out.println("插入數(shù)據(jù)耗費(fèi)時(shí)間:"+(end-start));
}
結(jié)果:實(shí)際上不知道等了多久很慢很慢,總體時(shí)間差不多半個(gè)多小時(shí),因?yàn)檫@里的for循環(huán)進(jìn)行單條插入時(shí),每次都是在獲取連接(Connection)、釋放連接和資源關(guān)閉等操作上,(如果數(shù)據(jù)量大的情況下)極其消耗資源,導(dǎo)致時(shí)間長(zhǎng)。
2. xml拼接foreach sql插入(大量數(shù)據(jù)不建議)
10萬(wàn)條數(shù)據(jù)插入數(shù)據(jù)耗費(fèi)時(shí)間:3.554秒
@GetMapping("/sql")
public void insertSqlData () {
long start = System.currentTimeMillis();
ArrayList<StudentDO> arrayList = new ArrayList<>();
for (int i = 0; i < 100000 ; i++) {
StudentDO StudentDO = new StudentDO("張三"+i, i, "地址"+i, i+"號(hào)");
arrayList.add(StudentDO);
}
studentMapper.insertSplice(arrayList);
long end = System.currentTimeMillis();
System.out.println("插入數(shù)據(jù)耗費(fèi)時(shí)間:"+(end-start));
}
結(jié)果:我們?cè)贛apper里面是要insert注解拼接,拼接結(jié)果就是將所有的數(shù)據(jù)集成在一條SQL語(yǔ)句的value值上,其由于提交到服務(wù)器上的insert語(yǔ)句少了,相就不需要每次獲取連接(Connection)、釋放連接和資源關(guān)閉,網(wǎng)絡(luò)負(fù)載少了,插入的性能有了提高。但是在數(shù)據(jù)量大的情況下可能會(huì)出現(xiàn)內(nèi)存溢出、解析SQL語(yǔ)句耗時(shí)等情況。
3. mybatis-plus 批量插入saveBatch(推薦)
10萬(wàn)條數(shù)據(jù)插入數(shù)據(jù)耗費(fèi)時(shí)間:2.481秒,在數(shù)據(jù)量不大的情況下和上面差不多
50萬(wàn)條數(shù)據(jù)插入數(shù)據(jù)耗費(fèi)時(shí)間:12.473秒
@GetMapping("/batch")
public void insertSaveBatchData () {
long start = System.currentTimeMillis();
ArrayList<StudentDO> arrayList = new ArrayList<>();
for (int i = 0; i < 100000 ; i++) {
StudentDO StudentDO = new StudentDO("張三"+i, i, "地址"+i, i+"號(hào)");
arrayList.add(StudentDO);
}
studentService.saveBatch(arrayList);
long end = System.currentTimeMillis();
System.out.println("插入數(shù)據(jù)耗費(fèi)時(shí)間:"+(end-start));
}
結(jié)果:使用MyBatis-Plus實(shí)現(xiàn)IService接口中批處理saveBatch()方法,可以很明顯的看到性能有了提升,我們可以查看一下源碼,它的底層實(shí)現(xiàn)原理利用分片處理(batchSize = 1000) + 分批提交事務(wù)的操作,來(lái)提高插入性能,并沒(méi)有在連接上消耗性能,MySQLJDBC驅(qū)動(dòng)默認(rèn)情況下忽略saveBatch()方法中的executeBatch()語(yǔ)句,將需要批量處理的一組SQL語(yǔ)句進(jìn)行拆散,執(zhí)行時(shí)一條一條給MySQL數(shù)據(jù)庫(kù),造成實(shí)際上是分片插入,即與單條插入方式相比,有提高,但是性能未能得到實(shí)質(zhì)性的提高。
- 手動(dòng)開(kāi)啟批處理模式+批量插入手動(dòng)提交(推薦)
10萬(wàn)條數(shù)據(jù)插入數(shù)據(jù)耗費(fèi)時(shí)間:2.481秒,
50萬(wàn)條數(shù)據(jù)插入數(shù)據(jù)耗費(fèi)時(shí)間:13.436秒
和上面相比不管是大數(shù)據(jù)量還是小數(shù)據(jù)量?jī)烧叨际遣畈欢?/li>
@GetMapping("/forSaveBatch")
public void insertforSaveBatchData () {
//創(chuàng)建批量插入SqlSession
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
StudentMapper studentMapper = sqlSession.getMapper(StudentMapper.class);
long start = System.currentTimeMillis();
ArrayList<StudentDO> arrayList = new ArrayList<>();
for (int i = 0; i < 500000 ; i++) {
StudentDO StudentDO = new StudentDO("張三"+i, i, "地址"+i, i+"號(hào)");
studentMapper.insert(StudentDO);
}
sqlSession.commit();
sqlSession.close();
long end = System.currentTimeMillis();
System.out.println("插入數(shù)據(jù)耗費(fèi)時(shí)間:"+(end-start));
}
結(jié)果:手動(dòng)開(kāi)啟批處理,手動(dòng)處理關(guān)閉自動(dòng)提交事務(wù),共用同一個(gè)SqlSession之后,for循環(huán)單條插入的性能得到實(shí)質(zhì)性的提高;由于同一個(gè)SqlSession省去對(duì)資源相關(guān)操作的耗能、減少對(duì)事務(wù)處理的時(shí)間等,從而極大程度上提高執(zhí)行效率。
5. ThreadPoolTaskExecutor分割多線程插入(大數(shù)據(jù)量強(qiáng)烈推薦)
50萬(wàn)條數(shù)據(jù)插入數(shù)據(jù)耗費(fèi)時(shí)間:3。536秒,插入速度直接是前面的4倍,是不是很疑惑這樣就快了這么多?
原理:多線程批量插入的過(guò)程,首先定義了一個(gè)線程池(ThreadPoolTaskExecutor),用于管理線程的生命周期和執(zhí)行任務(wù)。然后,我們將要插入的數(shù)據(jù)列表按照指定的批次大小分割成多個(gè)子列表,并開(kāi)啟多個(gè)線程來(lái)執(zhí)行插入操作,通過(guò) TransactionManager 獲取事務(wù)管理器,并使用 TransactionDefinition 定義事務(wù)屬性。然后,在每個(gè)線程中,我們通過(guò) transactionManager.getTransaction() 方法獲取事務(wù)狀態(tài),并在插入操作中使用該狀態(tài)來(lái)管理事務(wù)。
在插入操作完成后,我們?cè)俑鶕?jù)操作結(jié)果調(diào)用transactionManager.commit()或 transactionManager.rollback() 方法來(lái)提交或回滾事務(wù)。在每個(gè)線程執(zhí)行完畢后,都會(huì)調(diào)用 CountDownLatch 的 countDown() 方法,以便主線程等待所有線程都執(zhí)行完畢后再返回。
@GetMapping("/threadPoolInsert")
public void insertThreadPoolBatchData () {
ArrayList<StudentDO> arrayList = new ArrayList<>();
for (int i = 0; i < 500000 ; i++) {
StudentDO StudentDO = new StudentDO("張三"+i, i, "武漢"+i, i+"號(hào)");
arrayList.add(StudentDO);
}
int count = arrayList.size();
int pageSize = 10000;
int threadNum = count % pageSize == 0 ? count / pageSize: count / pageSize + 1;
CountDownLatch downLatch = new CountDownLatch(threadNum);
long start = System.currentTimeMillis();
for (int i = 0; i < threadNum; i++) {
//開(kāi)始序號(hào)
int startIndex = i * pageSize;
//結(jié)束序號(hào)
int endIndex = Math.min(count, (i+1)*pageSize);
//分割list
List<StudentDO> StudentDOs = arrayList.subList(startIndex, endIndex);
taskExecutor.execute(() -> {
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
TransactionStatus status = transactionManager.getTransaction(definition);
try {
studentMapper.insertSplice(StudentDOs);
transactionManager.commit(status);
}catch (Exception e){
transactionManager.rollback(status);
e.printStackTrace();
}finally {
//執(zhí)行完后 計(jì)數(shù)
downLatch.countDown();
}
});
}
try {
//等待
downLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
long end = System.currentTimeMillis();
System.out.println("插入數(shù)據(jù)耗費(fèi)時(shí)間:"+(end-start));
}
- ThreadPoolTaskExecutor分割異步插入
除了上面多線程分割插入,我們也可以使用多線程異步插入其實(shí)和上面插入的原理是差不多,下面演示異步插入- 修改application.yml增加配置
這個(gè)參數(shù)根據(jù)自己的電腦配置合理設(shè)置
- 修改application.yml增加配置
async:
executor:
thread:
core_pool_size: 35
max_pool_size: 35
queue_capacity: 99999
name:
prefix: async-testDB-
- 自定義ThreadPoolTaskExecutor配置
@EnableAsync
@Configuration
public class ExecutorConfig {
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//設(shè)置核心線程數(shù)
taskExecutor.setCorePoolSize(corePoolSize);
//設(shè)置最大線程數(shù)
taskExecutor.setMaxPoolSize(maxPoolSize);
//設(shè)置隊(duì)列容量
taskExecutor.setQueueCapacity(queueCapacity);
//設(shè)置線程名前綴
taskExecutor.setThreadNamePrefix(namePrefix);
//設(shè)置拒絕策略
// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來(lái)執(zhí)行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return taskExecutor;
}
}
- 定義異步service和實(shí)現(xiàn)類
//接口服務(wù)
public interface AsyncService {
void executeAsync(List<StudentDO> studentDOList, CountDownLatch countDownLatch);
}
//實(shí)現(xiàn)類
@Service
public class AsyncServiceImpl extends ServiceImpl implements AsyncService {
@Autowired
private StudentService studentService;
@Async("asyncServiceExecutor")
@Override
public void executeAsync(List<StudentDO> studentDOList, CountDownLatch countDownLatch) {
try{
//異步線程要做的事情
studentService.saveBatch(studentDOList);
}finally {
countDownLatch.countDown();// 很關(guān)鍵, 無(wú)論上面程序是否異常必須執(zhí)行countDown,否則await無(wú)法釋放
}
}
}
- control代碼
這里需要注意修改, 因?yàn)槲覀冊(cè)贓xecutorConfig配置類里面重新設(shè)置了ThreadPoolTaskExecutor
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
改為
@Autowired
private Executor taskExecutor;
@GetMapping("/asyncInsertData")
public void asyncInsertData() {
List<StudentDO> studentDOList = getTestData();
//測(cè)試每100條數(shù)據(jù)插入開(kāi)一個(gè)線程
long start = System.currentTimeMillis();
List<List<StudentDO>> lists = ListUtil.split(studentDOList, 10000);
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
for (List<StudentDO> listSub:lists) {
asyncService.executeAsync(listSub,countDownLatch);
}
try {
countDownLatch.await(); //保證之前的所有的線程都執(zhí)行完成,才會(huì)走下面的;
// 這樣就可以在下面拿到所有線程執(zhí)行完的集合結(jié)果
} catch (Exception e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("插入數(shù)據(jù)耗費(fèi)時(shí)間:"+(end-start));
}
public List<StudentDO> getTestData() {
ArrayList<StudentDO> arrayList = new ArrayList<>();
for (int i = 0; i < 500000 ; i++) {
StudentDO studentDO = new StudentDO("張三"+i, i, "武漢"+i, i+"號(hào)");
arrayList.add(studentDO);
}
return arrayList;
}
50萬(wàn)條數(shù)據(jù)插入數(shù)據(jù)耗費(fèi)時(shí)間:2.604秒,這里插入和上面差不多因?yàn)樗麄兪褂玫亩际嵌嗑€程插入
總結(jié):經(jīng)過(guò)上面的示例演示我們心里已經(jīng)有譜了,知道什么時(shí)候該使用哪一種數(shù)據(jù)插入方式,針對(duì)對(duì)不同線程數(shù)的測(cè)試,發(fā)現(xiàn)不是線程數(shù)越多越好,具體多少合適,通常的算法:CPU核心數(shù)量*2 +2 個(gè)線程。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-854979.html
實(shí)際要根據(jù)每個(gè)人的電腦配置情況設(shè)置合適的線程數(shù),可以根據(jù)下面這個(gè)公式獲?。?span toymoban-style="hidden">文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-854979.html
int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用處理器的Java虛擬機(jī)的數(shù)量
int corePoolSize = (int) (processNum / (1 - 0.2));
int maxPoolSize = (int) (processNum / (1 - 0.5));
到了這里,關(guān)于SpringBoot高效批量插入百萬(wàn)數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!