環(huán)境
springboot、mybatisPlus、mysql8
mysql8(部署在1核2G的服務器上,很卡,所以下面的數(shù)據(jù)條數(shù)用5000,太大怕不是要等到花兒都謝了 0.0)
原始的for循環(huán)入庫
@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
@Override
@Transactional(rollbackFor = Exception.class)
public Object doTest() {
long start = System.currentTimeMillis();
List<MoreTestEntity> entityList = new ArrayList<>();
for (int i = 0; i < 5000; i++) {
MoreTestEntity entity = new MoreTestEntity();
entity.setId((long) i);
entity.setA(UUID.randomUUID().toString());
entity.setB(UUID.randomUUID().toString());
entity.setC(UUID.randomUUID().toString());
entity.setD(UUID.randomUUID().toString());
entity.setE(UUID.randomUUID().toString());
entity.setF(UUID.randomUUID().toString());
entity.setG(UUID.randomUUID().toString());
entity.setH(UUID.randomUUID().toString());
entity.setI(UUID.randomUUID().toString());
entity.setJ(UUID.randomUUID().toString());
entity.setK(UUID.randomUUID().toString());
entityList.add(entity);
//在循環(huán)中入庫
baseMapper.insert(entity);
}
long end = System.currentTimeMillis();
System.err.println(end - start);
return end - start;
}
}
共耗時:180121 ms
批量保存操作
@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
@Override
@Transactional(rollbackFor = Exception.class)
public Object doTest() {
long start = System.currentTimeMillis();
List<MoreTestEntity> entityList = new ArrayList<>();
for (int i = 0; i < 5000; i++) {
MoreTestEntity entity = new MoreTestEntity();
entity.setId((long) i);
entity.setA(UUID.randomUUID().toString());
entity.setB(UUID.randomUUID().toString());
entity.setC(UUID.randomUUID().toString());
entity.setD(UUID.randomUUID().toString());
entity.setE(UUID.randomUUID().toString());
entity.setF(UUID.randomUUID().toString());
entity.setG(UUID.randomUUID().toString());
entity.setH(UUID.randomUUID().toString());
entity.setI(UUID.randomUUID().toString());
entity.setJ(UUID.randomUUID().toString());
entity.setK(UUID.randomUUID().toString());
entityList.add(entity);
}
//mybatisPlus提供的批量保存方法,數(shù)字代表每幾條數(shù)據(jù)提交一次事務,默認1000
saveBatch(entityList, 1000);
long end = System.currentTimeMillis();
System.err.println(end - start);
return end - start;
}
}
耗時時間:87217ms
在批量插入的基礎(chǔ)上使用多線程
@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
@Override
@Transactional(rollbackFor = Exception.class)
public Object doTest() throws InterruptedException {
long start = System.currentTimeMillis();
//手動創(chuàng)建線程池,注意你 數(shù)據(jù)庫連接池的 允許連接數(shù)量,別超過了就行。
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
5,
5,
30,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10),
//isDaemon 設(shè)置線程是否是守護線程,true的話,主線程結(jié)束,new的線程就不會繼續(xù)工作
new NamedThreadFactory("執(zhí)行線程", false),
(r, executor) -> System.out.println("拒絕" + r));
List<MoreTestEntity> entityList = new ArrayList<>();
for (int i = 0; i < 5000; i++) {
MoreTestEntity entity = new MoreTestEntity();
entity.setId((long) i);
entity.setA(UUID.randomUUID().toString());
entity.setB(UUID.randomUUID().toString());
entity.setC(UUID.randomUUID().toString());
entity.setD(UUID.randomUUID().toString());
entity.setE(UUID.randomUUID().toString());
entity.setF(UUID.randomUUID().toString());
entity.setG(UUID.randomUUID().toString());
entity.setH(UUID.randomUUID().toString());
entity.setI(UUID.randomUUID().toString());
entity.setJ(UUID.randomUUID().toString());
entity.setK(UUID.randomUUID().toString());
entityList.add(entity);
}
//拆分list,將其拆分成5份,然后上面線程池創(chuàng)建也是5個核心線程,剛好執(zhí)行
List<List<MoreTestEntity>> partition = ListUtils.partition(entityList, 1000);
//使用CountDownLatch保證所有線程都執(zhí)行完成
CountDownLatch latch = new CountDownLatch(5);
partition.forEach(item -> {
poolExecutor.execute(() -> {
saveBatch(item, 1000);
latch.countDown();
});
});
latch.await();
// 也可以這么寫,設(shè)定超時時間
//latch.await(100,TimeUnit.SECONDS);
long end = System.currentTimeMillis();
System.err.println(end - start);
//關(guān)閉線程池
poolExecutor.shutdown();
return end - start;
}
}
耗時時間: 28235
可見時間從180秒,縮短到了28秒,但是@Transactional對于多線程是控制不了所有的事務的。
Spring實現(xiàn)事務的原理是通過ThreadLocal把數(shù)據(jù)庫連接綁定到當前線程中,同一個事務中數(shù)據(jù)庫操作使用同一個jdbc connection,新開啟的線程獲取不到當前jdbc connection。
如下代碼:
partition.forEach(item -> {
poolExecutor.execute(() -> {
saveBatch(item, 1000);
latch.countDown();
//讓每個都報錯
int i = 1/0;
});
});
控制臺打印:
Exception in thread "執(zhí)行線程5" java.lang.ArithmeticException: / by zero
at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Exception in thread "執(zhí)行線程2" java.lang.ArithmeticException: / by zero
at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Exception in thread "執(zhí)行線程4" java.lang.ArithmeticException: / by zero
at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Exception in thread "執(zhí)行線程1" java.lang.ArithmeticException: / by zero
at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Exception in thread "執(zhí)行線程3" 30179
java.lang.ArithmeticException: / by zero
at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
可見5個線程都報錯了,但是去查詢數(shù)據(jù)庫,卻可以查詢到5000條數(shù)據(jù),這是不應該出現(xiàn)的情況。
處理多線程入庫的事務問題
@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
@Resource
private DataSourceTransactionManager dataSourceTransactionManager;
@Resource
private TransactionDefinition transactionDefinition;
@Override
//此處手動管理事務的提交后,這個注解就可以去掉了
// @Transactional(rollbackFor = Exception.class)
public Object doTest() {
long start = System.currentTimeMillis();
//手動創(chuàng)建線程池,注意你 數(shù)據(jù)庫連接池的 允許連接數(shù)量,別超過了就行。
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
5,
5,
30,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10),
//isDaemon 設(shè)置線程是否是守護線程,true的話,主線程結(jié)束,new的線程就不會繼續(xù)工作
new NamedThreadFactory("執(zhí)行線程", false),
(r, executor) -> System.out.println("拒絕" + r));
List<MoreTestEntity> entityList = new ArrayList<>();
for (int i = 0; i < 50; i++) {
MoreTestEntity entity = new MoreTestEntity();
entity.setId((long) i);
entity.setA(UUID.randomUUID().toString());
entity.setB(UUID.randomUUID().toString());
entity.setC(UUID.randomUUID().toString());
entity.setD(UUID.randomUUID().toString());
entity.setE(UUID.randomUUID().toString());
entity.setF(UUID.randomUUID().toString());
entity.setG(UUID.randomUUID().toString());
entity.setH(UUID.randomUUID().toString());
entity.setI(UUID.randomUUID().toString());
entity.setJ(UUID.randomUUID().toString());
entity.setK(UUID.randomUUID().toString());
entityList.add(entity);
}
//拆分list,將其拆分成5份,然后上面線程池創(chuàng)建也是5個核心線程,剛好執(zhí)行
List<List<MoreTestEntity>> partition = ListUtils.partition(entityList, 10);
//使用CountDownLatch保證所有線程都執(zhí)行完成
CountDownLatch sonLatch = new CountDownLatch(5);
//主線程的 肯定為1
CountDownLatch mainLatch = new CountDownLatch(1);
AtomicBoolean hasError = new AtomicBoolean(false);
partition.forEach(item -> {
poolExecutor.execute(() -> {
doSave(item, sonLatch, hasError, mainLatch);
});
});
try {
//此處應該是用try catch 包裹著主線程的所有業(yè)務代碼,以此保證主線程中任何一處報錯都可以通知子線程
//這里加一個是為了調(diào)試主線程中的數(shù)據(jù)入庫操作
MoreTestEntity entity = new MoreTestEntity();
entity.setId((long) 99999);
entity.setA(UUID.randomUUID().toString());
entity.setB(UUID.randomUUID().toString());
entity.setC(UUID.randomUUID().toString());
entity.setD(UUID.randomUUID().toString());
entity.setE(UUID.randomUUID().toString());
entity.setF(UUID.randomUUID().toString());
entity.setG(UUID.randomUUID().toString());
entity.setH(UUID.randomUUID().toString());
entity.setI(UUID.randomUUID().toString());
entity.setJ(UUID.randomUUID().toString());
entity.setK(UUID.randomUUID().toString());
save(entity);
//主線程報錯
int i = 10 / 0;
sonLatch.await();
} catch (InterruptedException e) {
hasError.set(true);
e.printStackTrace();
}
mainLatch.countDown();
long end = System.currentTimeMillis();
System.err.println(end - start);
//關(guān)閉線程池
if (!poolExecutor.isShutdown()) {
poolExecutor.shutdown();
}
return end - start;
}
/**
* 包裝后的子線程的保存代碼
*
* @param entityList 要保存的集合
* @param sonLatch 子線程 CountDownLatch
* @param hasError 是否發(fā)生錯誤
* @param mainLatch 主線程 CountDownLatch
*/
private void doSave(List<MoreTestEntity> entityList,
CountDownLatch sonLatch,
AtomicBoolean hasError,
CountDownLatch mainLatch) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
try {
// //子線程報錯
// int i = 10 / 0;
saveBatch(entityList);
} catch (Throwable throwable) {
throwable.printStackTrace();
hasError.set(true);
} finally {
//這是必須的,每個子線程走完,要讓主線程繼續(xù)走,然后再回到子線程的每個任務,決定是提交還是回滾
sonLatch.countDown();
}
try {
//等待主線程的執(zhí)行結(jié)束
mainLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
hasError.set(true);
}
//事務操作
if (hasError.get()) {
dataSourceTransactionManager.rollback(transactionStatus);
} else {
dataSourceTransactionManager.commit(transactionStatus);
}
}
}
分別放開子線程報錯和主線程報錯,會發(fā)現(xiàn)事務都可以正?;貪L,達到了預期的效果。
主要思路就是通過子線程CountDownLatch和主線程CountDownLatch,控制線程好代碼的執(zhí)行順序即可。文章來源:http://www.zghlxwxcb.cn/news/detail-431324.html
最后補充幾點:文章來源地址http://www.zghlxwxcb.cn/news/detail-431324.html
- 上述代碼中的countDown()一旦出現(xiàn)不執(zhí)行的情況那會導致線程堵塞堆積,所以建議給await()增加超時時間
- 這樣操作可能還會出現(xiàn)問題,比如主線程通知子線程可以進行實務操作了,但是各個子線程之間非透明,所以還是有幾率存在某個子線程事務回滾失敗的情況。
到了這里,關(guān)于【SpringBoot】springboot數(shù)據(jù)使用多線程批量入數(shù)據(jù)庫的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!