倉庫地址:?buxingzhe: 一個多數(shù)據(jù)源和多線程事務(wù)練習(xí)項目
小伙伴們在日常編碼中經(jīng)常為了提高程序運行效率采用多線程編程,在不涉及事務(wù)的情況下,使用dou.lea大神提供的CompletableFuture異步編程利器,它提供了許多優(yōu)雅的api,我們可以很方便的進行異步多線程編程,速度杠杠的,在這里感謝大佬可憐我們廣大碼農(nóng)的不易,提供了如此優(yōu)秀的異步編程框架!
? ? ? ?剛才說了,不涉及事務(wù)情況下,用著爽歪歪,一旦涉及到事務(wù),沒有遇到這種情況的就頭疼了,多個線程之間發(fā)生異常,怎么回滾事務(wù)?因為很多業(yè)務(wù)場景使用了多線程編程,涉及到DML操作(select、update、insert、delete)中的增刪改,必須要保持?jǐn)?shù)據(jù)在業(yè)務(wù)上的一致性,比如修改A表,插入B表,這兩步在業(yè)務(wù)上必須是原子的,有一個失敗,對于另外表的操作都必須回滾,而spring中對不同線程的數(shù)據(jù)庫連接是單獨的,放在ThreadLocal中,多個線程之間不共享事務(wù),下面通過幾個淺顯易懂的示例,來解釋不同場景下的多線程報錯以及處理辦法。
?
?可以看到,子線程中寫了拋出異常代碼,但是控制臺沒有打印出,主線程和子線程事務(wù)都未回滾,數(shù)據(jù)正常插入,主線程沒有等子線程執(zhí)行完就結(jié)束。對上面的例子修改下:
?主線程中加入了join(),等待子線程執(zhí)行,這時控制臺打印了子線程拋出的異常如下:
?數(shù)據(jù)庫數(shù)據(jù)如下:
我們看到,主線程方法上由于加了?@Transactional(rollbackFor = Exception.class)聲明式事務(wù)注解,事務(wù)回滾了,數(shù)據(jù)并沒有插入。子線程雖然拋出異常,但是事務(wù)沒有回滾,數(shù)據(jù)正常插入了!這不是我們想要的結(jié)果,再繼續(xù)改進下:
先注入一個事務(wù)管理器
?然后在子線程中加入編程式事務(wù)代碼,手動管理子線程事務(wù)狀態(tài),發(fā)生異常后,回滾子線程事務(wù),并拋出異常至主線程中(直接貼代碼了,方便復(fù)制粘貼)
@Override
@Transactional(rollbackFor = Exception.class)
public void insert() {
System.out.println("主線程為:"+Thread.currentThread().getName());
List<User> list = new ArrayList<>(){{
add(new User("1","張三"));
}};
String sql = "insert into user(id,name) values (?,?)";
jdbcTemplate.batchUpdate(sql,list, list.size(), (ps,d)->{
ps.setString(1, d.getId());
ps.setString(2,d.getName());
});
//多線程異步操作
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// 事物隔離級別,開啟新事務(wù),這樣會比較安全些。
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
// 獲得事務(wù)狀態(tài)
TransactionStatus status = dataSourceTransactionManager.getTransaction(def);
try {
System.out.println("子線程1為:"+Thread.currentThread().getName());
List<User> syncList = new ArrayList<>(){{
add(new User("2","李四"));
}};
jdbcTemplate.batchUpdate("insert into user(id,name) values (?,?)",syncList, syncList.size(), (ps,d)->{
ps.setString(1, d.getId());
ps.setString(2,d.getName());
});
//此異常必拋出,模擬拋出異常
if(1<2){
throw new RuntimeException("子線程發(fā)生異常");
}
//開啟手動事務(wù)管理后,必須手動在邏輯結(jié)束時提交事務(wù),否則會造成鎖表,查詢可以,增刪改會卡住,除非重啟服務(wù)斷開與數(shù)據(jù)庫的連接
dataSourceTransactionManager.commit(status);
}catch (Exception e){
//發(fā)生異常時手動回滾子線程事務(wù)
dataSourceTransactionManager.rollback(status);
//拋出異常供主線程捕獲
throw new RuntimeException(e.getMessage());
}
}).exceptionally(throwable -> {
throw new RuntimeException(throwable.getCause().getMessage());
});
//必須等待子線程執(zhí)行完,拋出異常才能回滾主線程的事務(wù)
future.join();
}
執(zhí)行結(jié)果如下:
?數(shù)據(jù)庫數(shù)據(jù)如下:
?可以看到主線程和子線程中的數(shù)據(jù)都回滾了!
以上都還是較為簡單的場景,那如果異常是在主線程中發(fā)生或者在其他子線程發(fā)生,那所有線程中的事務(wù)如何回滾呢?請看示例
@Override
@Transactional(rollbackFor = Exception.class)
public void insert() {
System.out.println("主線程為:"+Thread.currentThread().getName());
List<User> list = new ArrayList<>(){{
add(new User("1","張三"));
}};
String sql = "insert into user(id,name) values (?,?)";
jdbcTemplate.batchUpdate(sql,list, list.size(), (ps,d)->{
ps.setString(1, d.getId());
ps.setString(2,d.getName());
});
//線程一拋出異常
CompletableFuture<Void> futureOne = getFutureOne();
//線程二無異常
CompletableFuture<Void> futureTwo = getFutureTwo();
//必須等待所有子線程執(zhí)行完,拋出異常才能回滾主線程的事務(wù)
CompletableFuture.allOf(futureOne,futureTwo).join();
}
public CompletableFuture<Void> getFutureOne(){
return CompletableFuture.runAsync(()->{
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// 事物隔離級別,開啟新事務(wù),這樣會比較安全些。
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
// 獲得事務(wù)狀態(tài)
TransactionStatus status = dataSourceTransactionManager.getTransaction(def);
try {
System.out.println("子線程1為:"+Thread.currentThread().getName());
List<User> syncList = new ArrayList<>(){{
add(new User("2","李四"));
}};
jdbcTemplate.batchUpdate("insert into user(id,name) values (?,?)",syncList, syncList.size(), (ps,d)->{
ps.setString(1, d.getId());
ps.setString(2,d.getName());
});
//此異常必拋出,模擬拋出異常
if(1<2){
throw new RuntimeException("子線程1發(fā)生異常");
}
//開啟手動事務(wù)管理后,必須手動在邏輯結(jié)束時提交事務(wù),否則會造成鎖表,查詢可以,增刪改會卡住,除非重啟服務(wù),斷開與數(shù)據(jù)庫的連接
dataSourceTransactionManager.commit(status);
}catch (Exception e){
//發(fā)生異常時手動回滾子線程事務(wù)
dataSourceTransactionManager.rollback(status);
//拋出異常供主線程捕獲
throw new RuntimeException(e.getMessage());
}
}).exceptionally(throwable -> {
throw new RuntimeException(throwable.getCause().getMessage());
});
}
public CompletableFuture<Void> getFutureTwo(){
return CompletableFuture.runAsync(()->{
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// 事物隔離級別,開啟新事務(wù),這樣會比較安全些。
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
// 獲得事務(wù)狀態(tài)
TransactionStatus status = dataSourceTransactionManager.getTransaction(def);
try {
System.out.println("子線程2為:"+Thread.currentThread().getName());
List<User> syncList = new ArrayList<>(){{
add(new User("3","王五"));
}};
jdbcTemplate.batchUpdate("insert into user(id,name) values (?,?)",syncList, syncList.size(), (ps,d)->{
ps.setString(1, d.getId());
ps.setString(2,d.getName());
});
//開啟手動事務(wù)管理后,必須手動在邏輯結(jié)束時提交事務(wù),否則會造成鎖表,查詢可以,增刪改會卡住,除非重啟服務(wù),斷開與數(shù)據(jù)庫的連接
dataSourceTransactionManager.commit(status);
}catch (Exception e){
//發(fā)生異常時手動回滾子線程事務(wù)
dataSourceTransactionManager.rollback(status);
//拋出異常供主線程捕獲
throw new RuntimeException(e.getMessage());
}
}).exceptionally(throwable -> {
throw new RuntimeException(throwable.getCause().getMessage());
});
}
上面的代碼中主線程調(diào)用了兩個異步子線程,其中子線程一拋出異常,子線程二無異常,主線程阻塞等待兩個子線程執(zhí)行結(jié)果
?可以看到異常打印在控制臺,且只有主線程和線程一的數(shù)據(jù)回滾
下面再修改下,實現(xiàn)當(dāng)子線程有異常拋出時,保證主線程和其他子線程也同步回滾:
@Override
@Transactional(rollbackFor = Exception.class)
public void insert() {
List<TransactionStatus> statusList = new Vector<>();
try {
System.out.println("主線程為:"+Thread.currentThread().getName());
List<User> list = new ArrayList<>(){{
add(new User("1","張三"));
}};
String sql = "insert into user(id,name) values (?,?)";
jdbcTemplate.batchUpdate(sql,list, list.size(), (ps,d)->{
ps.setString(1, d.getId());
ps.setString(2,d.getName());
});
//線程一拋出異常
CompletableFuture<Void> futureOne = getFutureOne(statusList);
//線程二無異常
CompletableFuture<Void> futureTwo = getFutureTwo(statusList);
//必須等待所有子線程執(zhí)行完,拋出異常才能回滾主線程的事務(wù)
CompletableFuture.allOf(futureOne,futureTwo).join();
statusList.forEach(dataSourceTransactionManager::commit);
}catch (Exception e){
statusList.forEach(dataSourceTransactionManager::rollback);
throw new RuntimeException(e.getMessage());
}
}
public CompletableFuture<Void> getFutureOne(List<TransactionStatus> statusList){
return CompletableFuture.runAsync(()->{
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// 事物隔離級別,開啟新事務(wù),這樣會比較安全些。
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
// 獲得事務(wù)狀態(tài)
TransactionStatus status = dataSourceTransactionManager.getTransaction(def);
try {
System.out.println("子線程1為:"+Thread.currentThread().getName());
List<User> syncList = new ArrayList<>(){{
add(new User("2","李四"));
}};
jdbcTemplate.batchUpdate("insert into user(id,name) values (?,?)",syncList, syncList.size(), (ps,d)->{
ps.setString(1, d.getId());
ps.setString(2,d.getName());
});
//此異常必拋出,模擬拋出異常
if(1<2){
throw new RuntimeException("子線程1發(fā)生異常");
}
//dataSourceTransactionManager.commit(status);
}catch (Exception e){
//拋出異常供主線程捕獲
//dataSourceTransactionManager.rollback(status);
throw new RuntimeException(e.getMessage());
}finally {
statusList.add(status);
}
}).exceptionally(throwable -> {
throw new RuntimeException(throwable.getCause().getMessage());
});
}
public CompletableFuture<Void> getFutureTwo(List<TransactionStatus> statusList){
return CompletableFuture.runAsync(()->{
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// 事物隔離級別,開啟新事務(wù),這樣會比較安全些。
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
// 獲得事務(wù)狀態(tài)
TransactionStatus status = dataSourceTransactionManager.getTransaction(def);
try {
System.out.println("子線程2為:"+Thread.currentThread().getName());
List<User> syncList = new ArrayList<>(){{
add(new User("3","王五"));
}};
jdbcTemplate.batchUpdate("insert into user(id,name) values (?,?)",syncList, syncList.size(), (ps,d)->{
ps.setString(1, d.getId());
ps.setString(2,d.getName());
});
//dataSourceTransactionManager.commit(status);
}catch (Exception e){
//dataSourceTransactionManager.rollback(status);
throw new RuntimeException(e.getMessage());
}finally {
statusList.add(status);
}
}).exceptionally(throwable -> {
throw new RuntimeException(throwable.getCause().getMessage());
});
}
用線程安全的集合Vector收集子線程的事務(wù)狀態(tài),子線程不做commit和rollback,調(diào)用后報錯如下:
?No value for key [HikariDataSource (HikariPool-1)] bound to thread [main]
解釋: 無法在當(dāng)前線程綁定的threadLocal中尋找到HikariDataSource作為key,對應(yīng)關(guān)聯(lián)的資源對象ConnectionHolder
spring中一次事務(wù)的完成通常都是默認(rèn)在當(dāng)前線程內(nèi)完成的,又因為一次事務(wù)的執(zhí)行過程中,涉及到對當(dāng)前數(shù)據(jù)庫連接Connection的操作,因此為了避免將Connection在事務(wù)執(zhí)行過程中來回傳遞,我們可以將Connextion綁定到當(dāng)前事務(wù)執(zhí)行線程對應(yīng)的ThreadLocalMap內(nèi)部,順便還可以將一些其他屬性也放入其中進行保存,在Spring中,負(fù)責(zé)保存這些ThreadLocal屬性的實現(xiàn)類由TransactionSynchronizationManager承擔(dān)。
TransactionSynchronizationManager類內(nèi)部默認(rèn)提供了下面六個ThreadLocal屬性,分別保存當(dāng)前線程對應(yīng)的不同事務(wù)資源:
//保存當(dāng)前事務(wù)關(guān)聯(lián)的資源--默認(rèn)只會在新建事務(wù)的時候保存當(dāng)前獲取到的DataSource和當(dāng)前事務(wù)對應(yīng)Connection的映射關(guān)系--當(dāng)然這里Connection被包裝為了ConnectionHolder
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
//事務(wù)監(jiān)聽者--在事務(wù)執(zhí)行到某個階段的過程中,會去回調(diào)監(jiān)聽者對應(yīng)的回調(diào)接口(典型觀察者模式的應(yīng)用)---默認(rèn)為空集合
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
//見名知意: 存放當(dāng)前事務(wù)名字
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
//見名知意: 存放當(dāng)前事務(wù)是否是只讀事務(wù)
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");
//見名知意: 存放當(dāng)前事務(wù)的隔離級別
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
//見名知意: 存放當(dāng)前事務(wù)是否處于激活狀態(tài)
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");
那么上面拋出的異常的原因也就很清楚了,無法在main線程找到當(dāng)前事務(wù)對應(yīng)的資源,原因如下:
主線程為:http-nio-5566-exec-2
子線程1為:ForkJoinPool.commonPool-worker-1
子線程2為:ForkJoinPool.commonPool-worker-2
開啟新事務(wù)時,事務(wù)相關(guān)資源都被綁定到了http-nio-5566-exec-2線程對應(yīng)的threadLocalMap內(nèi)部,而當(dāng)執(zhí)行事務(wù)提交代碼時,commit內(nèi)部需要從TransactionSynchronizationManager中獲取當(dāng)前事務(wù)的資源,顯然我們無法從main線程對應(yīng)的threadLocalMap中獲取到對應(yīng)的事務(wù)資源,這就是異常拋出的原因。
下面介紹一種可用的多線程事務(wù)回滾方式,但是對編程順序有要求,小伙伴們可以按需使用。
首先提供一個多線程事務(wù)管理類:
import lombok.extern.slf4j.Slf4j; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.DefaultTransactionDefinition; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * 多線程事務(wù)管理器 * @Title: MultiThreadingTransactionManager * @Description: TODO * @author: hulei * @date: 2023/8/7 11:36 * @Version: 1.0 */ @Slf4j public class MultiThreadingTransactionManager { /** * 事務(wù)管理器 */ private final PlatformTransactionManager transactionManager; /** * 超時時間 */ private final long timeout; /** * 時間單位 */ private final TimeUnit unit; /** * 一階段門閂,(第一階段的準(zhǔn)備階段),當(dāng)所有子線程準(zhǔn)備完成時(除“提交/回滾”操作以外的工作都完成),countDownLatch的值為0 */ private CountDownLatch oneStageLatch = null; /** * 二階段門閂,(第二階段的執(zhí)行執(zhí)行),主線程將不再等待子線程執(zhí)行,直接判定總的任務(wù)執(zhí)行失敗,執(zhí)行第二階段讓等待確認(rèn)的線程進行回滾 */ private final CountDownLatch twoStageLatch = new CountDownLatch(1); /** * 是否提交事務(wù),默認(rèn)是true(當(dāng)任一線程發(fā)生異常時,isSubmit會被設(shè)置為false,即回滾事務(wù)) */ private final AtomicBoolean isSubmit = new AtomicBoolean(true); /** * 構(gòu)造方法 * * @param transactionManager 事務(wù)管理器 * @param timeout 超時時間 * @param unit 時間單位 */ public MultiThreadingTransactionManager(PlatformTransactionManager transactionManager, long timeout, TimeUnit unit) { this.transactionManager = transactionManager; this.timeout = timeout; this.unit = unit; } /** * 線程池方式執(zhí)行任務(wù),可保證線程間的事務(wù)一致性 * * @param runnableList 任務(wù)列表 * @param executor 線程池 */ public void execute(List<Runnable> runnableList, ExecutorService executor) { // 排除null值 runnableList.removeAll(Collections.singleton(null)); // 屬性初始化 innit(runnableList.size()); // 遍歷任務(wù)列表并放入線程池 for (Runnable runnable : runnableList) { // 創(chuàng)建線程 Thread thread = new Thread(() -> { // 如果別的線程執(zhí)行失敗,則該任務(wù)就不需要再執(zhí)行了 if (!isSubmit.get()) { log.info("當(dāng)前子線程執(zhí)行中止,因為線程事務(wù)中有子線程執(zhí)行失敗"); oneStageLatch.countDown(); return; } // 開啟事務(wù) TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition()); try { // 執(zhí)行業(yè)務(wù)邏輯 runnable.run(); } catch (Exception e) { // 執(zhí)行體發(fā)生異常,設(shè)置回滾 isSubmit.set(false); log.error("線程{}:業(yè)務(wù)發(fā)生異常,執(zhí)行體:{}", Thread.currentThread().getName(), runnable); } // 計數(shù)器減一 oneStageLatch.countDown(); try { //等待所有線程任務(wù)完成,監(jiān)控是否有異常,有則統(tǒng)一回滾 twoStageLatch.await(); // 根據(jù)isSubmit值判斷事務(wù)是否提交,可能是子線程出現(xiàn)異常,也有可能是子線程執(zhí)行超時 if (isSubmit.get()) { // 提交 transactionManager.commit(transactionStatus); log.info("線程{}:事務(wù)提交成功,執(zhí)行體:{}", Thread.currentThread().getName(), runnable); } else { // 回滾 transactionManager.rollback(transactionStatus); log.info("線程{}:事務(wù)回滾成功,執(zhí)行體:{}", Thread.currentThread().getName(), runnable); } } catch (InterruptedException e) { log.error("子線程拋出異常:{}",e.getMessage()); } }); executor.execute(thread); } //主線程擔(dān)任協(xié)調(diào)者,當(dāng)?shù)谝浑A段所有參與者準(zhǔn)備完成,oneStageLatch的計數(shù)為0 //主線程發(fā)起第二階段,執(zhí)行階段(提交或回滾) try { // 主線程等待所有線程執(zhí)行完成,超時時間設(shè)置為五秒,超出等待時間則返回false,計數(shù)為0返回true boolean timeOutFlag = oneStageLatch.await(timeout, unit); long count = oneStageLatch.getCount(); // 主線程等待超時,子線程可能發(fā)生長時間阻塞,死鎖 if (count > 0 || !timeOutFlag) { // 設(shè)置為回滾 isSubmit.set(false); log.info("主線線程等待超時,任務(wù)即將全部回滾"); throw new RuntimeException("主線線程等待超時,任務(wù)即將全部回滾"); } twoStageLatch.countDown(); } catch (InterruptedException e) { log.error(e.getMessage()); throw new RuntimeException(e.getMessage()); } // 返回結(jié)果,是否執(zhí)行成功,事務(wù)提交即為執(zhí)行成功,事務(wù)回滾即為執(zhí)行失敗 boolean flag = isSubmit.get(); if(!flag){ log.info("有線程發(fā)生異常,事務(wù)全部回滾"); throw new RuntimeException("有線程發(fā)生異常,數(shù)據(jù)全部回滾"); }else{ log.info("主線程和子線程執(zhí)行無異常,事務(wù)全部提交"); } executor.shutdown(); } /** * 初始化屬性 * * @param size 任務(wù)數(shù)量 */ private void innit(int size) { oneStageLatch = new CountDownLatch(size); } }
?看下調(diào)用代碼示例
import com.hulei.studyproject.entity.User; import com.hulei.studyproject.threadpool.ThreadPoolUtil; import com.hulei.studyproject.transaction.MultiThreadingTransactionManager; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.Transactional; import java.util.*; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 測試多線程事務(wù)回滾 * @Title: UserServiceImpl * @Description: TODO * @author: hulei * @date: 2023/7/31 17:41 * @Version: 1.0 */ @Service @RequiredArgsConstructor @Slf4j public class UserServiceImpl implements IUserService{ private final JdbcTemplate jdbcTemplate; private final PlatformTransactionManager platformTransactionManager; @Override @Transactional(rollbackFor = Exception.class) public void insert() { System.out.println("主線程為:"+Thread.currentThread().getName()); List<User> list = new ArrayList<>(){{ add(new User("1","張三")); }}; String sql = "insert into user(id,name) values (?,?)"; jdbcTemplate.batchUpdate(sql,list, list.size(), (ps,d)->{ ps.setString(1, d.getId()); ps.setString(2,d.getName()); }); List<Runnable> runnableList = getRunnables(); MultiThreadingTransactionManager multiThreadingTransactionManager = new MultiThreadingTransactionManager(platformTransactionManager,5, TimeUnit.SECONDS); ThreadPoolExecutor executor = ThreadPoolUtil.getThreadPool(); multiThreadingTransactionManager.execute(runnableList,executor); } private List<Runnable> getRunnables() { List<Runnable> runnableList = new ArrayList<>(); runnableList.add(()->{ System.out.println("子線程1為:"+Thread.currentThread().getName()); List<User> listOne = new ArrayList<>(){{ add(new User("2","李四")); }}; String sqlOne = "insert into user(id,name) values (?,?)"; jdbcTemplate.batchUpdate(sqlOne,listOne, listOne.size(), (ps,d)->{ ps.setString(1, d.getId()); ps.setString(2,d.getName()); }); int a = 10/0; }); runnableList.add(()->{ System.out.println("子線程2為:"+Thread.currentThread().getName()); List<User> listTwo = new ArrayList<>(){{ add(new User("3","王五")); }}; String sqlTwo = "insert into user(id,name) values (?,?)"; jdbcTemplate.batchUpdate(sqlTwo,listTwo, listTwo.size(), (ps,d)->{ ps.setString(1, d.getId()); ps.setString(2,d.getName()); }); }); return runnableList; } }
我們在其中一個子線程處手工寫了一個會拋出異常的代碼
?執(zhí)行結(jié)果如下:
?可以看到控制臺報錯,數(shù)據(jù)庫執(zhí)行結(jié)果如下:
主線程和子線程數(shù)據(jù)均未生成
把異常代碼注釋掉
執(zhí)行結(jié)果如下
?無異常拋出,數(shù)據(jù)庫結(jié)果如下:
?可以看到子線程和主線程操作的數(shù)據(jù)均已回滾。
但是以上方法有一定局限性,即主線程如果再子線程執(zhí)行后再拋出異常,則子線程無法回滾了,所以要求邏輯寫在子線程執(zhí)行之前
文章來源:http://www.zghlxwxcb.cn/news/detail-815437.html
?2處的代碼必須放在方法最后寫。文章來源地址http://www.zghlxwxcb.cn/news/detail-815437.html
到了這里,關(guān)于CompletableFuture異步編程事務(wù)及多數(shù)據(jù)源配置詳解(含gitee源碼)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!