国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

CompletableFuture異步編程事務(wù)及多數(shù)據(jù)源配置詳解(含gitee源碼)

這篇具有很好參考價值的文章主要介紹了CompletableFuture異步編程事務(wù)及多數(shù)據(jù)源配置詳解(含gitee源碼)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

倉庫地址:?buxingzhe: 一個多數(shù)據(jù)源和多線程事務(wù)練習(xí)項目

小伙伴們在日常編碼中經(jīng)常為了提高程序運行效率采用多線程編程,在不涉及事務(wù)的情況下,使用dou.lea大神提供的CompletableFuture異步編程利器,它提供了許多優(yōu)雅的api,我們可以很方便的進行異步多線程編程,速度杠杠的,在這里感謝大佬可憐我們廣大碼農(nóng)的不易,提供了如此優(yōu)秀的異步編程框架!

? ? ? ?剛才說了,不涉及事務(wù)情況下,用著爽歪歪,一旦涉及到事務(wù),沒有遇到這種情況的就頭疼了,多個線程之間發(fā)生異常,怎么回滾事務(wù)?因為很多業(yè)務(wù)場景使用了多線程編程,涉及到DML操作(select、update、insert、delete)中的增刪改,必須要保持?jǐn)?shù)據(jù)在業(yè)務(wù)上的一致性,比如修改A表,插入B表,這兩步在業(yè)務(wù)上必須是原子的,有一個失敗,對于另外表的操作都必須回滾,而spring中對不同線程的數(shù)據(jù)庫連接是單獨的,放在ThreadLocal中,多個線程之間不共享事務(wù),下面通過幾個淺顯易懂的示例,來解釋不同場景下的多線程報錯以及處理辦法。

事務(wù)中使用compleablefuture,java,開發(fā)語言

?事務(wù)中使用compleablefuture,java,開發(fā)語言

事務(wù)中使用compleablefuture,java,開發(fā)語言

?可以看到,子線程中寫了拋出異常代碼,但是控制臺沒有打印出,主線程和子線程事務(wù)都未回滾,數(shù)據(jù)正常插入,主線程沒有等子線程執(zhí)行完就結(jié)束。對上面的例子修改下:

事務(wù)中使用compleablefuture,java,開發(fā)語言

?主線程中加入了join(),等待子線程執(zhí)行,這時控制臺打印了子線程拋出的異常如下:

事務(wù)中使用compleablefuture,java,開發(fā)語言

?數(shù)據(jù)庫數(shù)據(jù)如下:

事務(wù)中使用compleablefuture,java,開發(fā)語言

我們看到,主線程方法上由于加了?@Transactional(rollbackFor = Exception.class)聲明式事務(wù)注解,事務(wù)回滾了,數(shù)據(jù)并沒有插入。子線程雖然拋出異常,但是事務(wù)沒有回滾,數(shù)據(jù)正常插入了!這不是我們想要的結(jié)果,再繼續(xù)改進下:

先注入一個事務(wù)管理器

事務(wù)中使用compleablefuture,java,開發(fā)語言

?然后在子線程中加入編程式事務(wù)代碼,手動管理子線程事務(wù)狀態(tài),發(fā)生異常后,回滾子線程事務(wù),并拋出異常至主線程中(直接貼代碼了,方便復(fù)制粘貼)

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void insert() {
        System.out.println("主線程為:"+Thread.currentThread().getName());
        List<User> list = new ArrayList<>(){{
            add(new User("1","張三"));
        }};
        String sql = "insert into user(id,name) values (?,?)";
        jdbcTemplate.batchUpdate(sql,list, list.size(), (ps,d)->{
            ps.setString(1, d.getId());
            ps.setString(2,d.getName());
        });
        //多線程異步操作
        CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            // 事物隔離級別,開啟新事務(wù),這樣會比較安全些。
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
            // 獲得事務(wù)狀態(tài)
            TransactionStatus status = dataSourceTransactionManager.getTransaction(def);
            try {
                System.out.println("子線程1為:"+Thread.currentThread().getName());
                List<User> syncList = new ArrayList<>(){{
                    add(new User("2","李四"));
                }};
                jdbcTemplate.batchUpdate("insert into user(id,name) values (?,?)",syncList, syncList.size(), (ps,d)->{
                    ps.setString(1, d.getId());
                    ps.setString(2,d.getName());
                });
                //此異常必拋出,模擬拋出異常
                if(1<2){
                    throw new RuntimeException("子線程發(fā)生異常");
                }
                //開啟手動事務(wù)管理后,必須手動在邏輯結(jié)束時提交事務(wù),否則會造成鎖表,查詢可以,增刪改會卡住,除非重啟服務(wù)斷開與數(shù)據(jù)庫的連接
                dataSourceTransactionManager.commit(status);
            }catch (Exception e){
                //發(fā)生異常時手動回滾子線程事務(wù)
                dataSourceTransactionManager.rollback(status);
                //拋出異常供主線程捕獲
                throw new RuntimeException(e.getMessage());
            }
        }).exceptionally(throwable -> {
            throw new RuntimeException(throwable.getCause().getMessage());
        });
        //必須等待子線程執(zhí)行完,拋出異常才能回滾主線程的事務(wù)
        future.join();
    }

執(zhí)行結(jié)果如下:

事務(wù)中使用compleablefuture,java,開發(fā)語言

?數(shù)據(jù)庫數(shù)據(jù)如下:

事務(wù)中使用compleablefuture,java,開發(fā)語言

?可以看到主線程和子線程中的數(shù)據(jù)都回滾了!

以上都還是較為簡單的場景,那如果異常是在主線程中發(fā)生或者在其他子線程發(fā)生,那所有線程中的事務(wù)如何回滾呢?請看示例

@Override
    @Transactional(rollbackFor = Exception.class)
    public void insert() {
        System.out.println("主線程為:"+Thread.currentThread().getName());
        List<User> list = new ArrayList<>(){{
            add(new User("1","張三"));
        }};
        String sql = "insert into user(id,name) values (?,?)";
        jdbcTemplate.batchUpdate(sql,list, list.size(), (ps,d)->{
            ps.setString(1, d.getId());
            ps.setString(2,d.getName());
        });
        //線程一拋出異常
        CompletableFuture<Void> futureOne = getFutureOne();
        //線程二無異常
        CompletableFuture<Void> futureTwo = getFutureTwo();
        //必須等待所有子線程執(zhí)行完,拋出異常才能回滾主線程的事務(wù)
        CompletableFuture.allOf(futureOne,futureTwo).join();
    }

    public CompletableFuture<Void> getFutureOne(){
        return CompletableFuture.runAsync(()->{
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            // 事物隔離級別,開啟新事務(wù),這樣會比較安全些。
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
            // 獲得事務(wù)狀態(tài)
            TransactionStatus status = dataSourceTransactionManager.getTransaction(def);
            try {
                System.out.println("子線程1為:"+Thread.currentThread().getName());
                List<User> syncList = new ArrayList<>(){{
                    add(new User("2","李四"));
                }};
                jdbcTemplate.batchUpdate("insert into user(id,name) values (?,?)",syncList, syncList.size(), (ps,d)->{
                    ps.setString(1, d.getId());
                    ps.setString(2,d.getName());
                });
                //此異常必拋出,模擬拋出異常
                if(1<2){
                    throw new RuntimeException("子線程1發(fā)生異常");
                }
                //開啟手動事務(wù)管理后,必須手動在邏輯結(jié)束時提交事務(wù),否則會造成鎖表,查詢可以,增刪改會卡住,除非重啟服務(wù),斷開與數(shù)據(jù)庫的連接
                dataSourceTransactionManager.commit(status);
            }catch (Exception e){
                //發(fā)生異常時手動回滾子線程事務(wù)
                dataSourceTransactionManager.rollback(status);
                //拋出異常供主線程捕獲
                throw new RuntimeException(e.getMessage());
            }
        }).exceptionally(throwable -> {
            throw new RuntimeException(throwable.getCause().getMessage());
        });
    }

    public CompletableFuture<Void> getFutureTwo(){
        return CompletableFuture.runAsync(()->{
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            // 事物隔離級別,開啟新事務(wù),這樣會比較安全些。
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
            // 獲得事務(wù)狀態(tài)
            TransactionStatus status = dataSourceTransactionManager.getTransaction(def);
            try {
                System.out.println("子線程2為:"+Thread.currentThread().getName());
                List<User> syncList = new ArrayList<>(){{
                    add(new User("3","王五"));
                }};
                jdbcTemplate.batchUpdate("insert into user(id,name) values (?,?)",syncList, syncList.size(), (ps,d)->{
                    ps.setString(1, d.getId());
                    ps.setString(2,d.getName());
                });
                //開啟手動事務(wù)管理后,必須手動在邏輯結(jié)束時提交事務(wù),否則會造成鎖表,查詢可以,增刪改會卡住,除非重啟服務(wù),斷開與數(shù)據(jù)庫的連接
                dataSourceTransactionManager.commit(status);
            }catch (Exception e){
                //發(fā)生異常時手動回滾子線程事務(wù)
                dataSourceTransactionManager.rollback(status);
                //拋出異常供主線程捕獲
                throw new RuntimeException(e.getMessage());
            }
        }).exceptionally(throwable -> {
            throw new RuntimeException(throwable.getCause().getMessage());
        });
    }

上面的代碼中主線程調(diào)用了兩個異步子線程,其中子線程一拋出異常,子線程二無異常,主線程阻塞等待兩個子線程執(zhí)行結(jié)果

事務(wù)中使用compleablefuture,java,開發(fā)語言

?可以看到異常打印在控制臺,且只有主線程和線程一的數(shù)據(jù)回滾

事務(wù)中使用compleablefuture,java,開發(fā)語言

下面再修改下,實現(xiàn)當(dāng)子線程有異常拋出時,保證主線程和其他子線程也同步回滾:

 @Override
    @Transactional(rollbackFor = Exception.class)
    public void insert() {
        List<TransactionStatus> statusList = new Vector<>();
        try {
            System.out.println("主線程為:"+Thread.currentThread().getName());
            List<User> list = new ArrayList<>(){{
                add(new User("1","張三"));
            }};
            String sql = "insert into user(id,name) values (?,?)";
            jdbcTemplate.batchUpdate(sql,list, list.size(), (ps,d)->{
                ps.setString(1, d.getId());
                ps.setString(2,d.getName());
            });
            //線程一拋出異常
            CompletableFuture<Void> futureOne = getFutureOne(statusList);
            //線程二無異常
            CompletableFuture<Void> futureTwo = getFutureTwo(statusList);
            //必須等待所有子線程執(zhí)行完,拋出異常才能回滾主線程的事務(wù)
            CompletableFuture.allOf(futureOne,futureTwo).join();
            statusList.forEach(dataSourceTransactionManager::commit);
        }catch (Exception e){
            statusList.forEach(dataSourceTransactionManager::rollback);
            throw new RuntimeException(e.getMessage());
        }
    }

    public CompletableFuture<Void> getFutureOne(List<TransactionStatus> statusList){
        return CompletableFuture.runAsync(()->{
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            // 事物隔離級別,開啟新事務(wù),這樣會比較安全些。
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
            // 獲得事務(wù)狀態(tài)
            TransactionStatus status = dataSourceTransactionManager.getTransaction(def);
            try {
                System.out.println("子線程1為:"+Thread.currentThread().getName());
                List<User> syncList = new ArrayList<>(){{
                    add(new User("2","李四"));
                }};
                jdbcTemplate.batchUpdate("insert into user(id,name) values (?,?)",syncList, syncList.size(), (ps,d)->{
                    ps.setString(1, d.getId());
                    ps.setString(2,d.getName());
                });
                //此異常必拋出,模擬拋出異常
                if(1<2){
                    throw new RuntimeException("子線程1發(fā)生異常");
                }
                //dataSourceTransactionManager.commit(status);
            }catch (Exception e){
                //拋出異常供主線程捕獲
                //dataSourceTransactionManager.rollback(status);
                throw new RuntimeException(e.getMessage());
            }finally {
                statusList.add(status);
            }
        }).exceptionally(throwable -> {
            throw new RuntimeException(throwable.getCause().getMessage());
        });
    }

    public CompletableFuture<Void> getFutureTwo(List<TransactionStatus> statusList){
        return CompletableFuture.runAsync(()->{
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            // 事物隔離級別,開啟新事務(wù),這樣會比較安全些。
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
            // 獲得事務(wù)狀態(tài)
            TransactionStatus status = dataSourceTransactionManager.getTransaction(def);
            try {
                System.out.println("子線程2為:"+Thread.currentThread().getName());
                List<User> syncList = new ArrayList<>(){{
                    add(new User("3","王五"));
                }};
                jdbcTemplate.batchUpdate("insert into user(id,name) values (?,?)",syncList, syncList.size(), (ps,d)->{
                    ps.setString(1, d.getId());
                    ps.setString(2,d.getName());
                });
                //dataSourceTransactionManager.commit(status);
            }catch (Exception e){
                //dataSourceTransactionManager.rollback(status);
                throw new RuntimeException(e.getMessage());
            }finally {
                statusList.add(status);
            }
        }).exceptionally(throwable -> {
            throw new RuntimeException(throwable.getCause().getMessage());
        });
    }

用線程安全的集合Vector收集子線程的事務(wù)狀態(tài),子線程不做commit和rollback,調(diào)用后報錯如下:

事務(wù)中使用compleablefuture,java,開發(fā)語言

?No value for key [HikariDataSource (HikariPool-1)] bound to thread [main]
解釋: 無法在當(dāng)前線程綁定的threadLocal中尋找到HikariDataSource作為key,對應(yīng)關(guān)聯(lián)的資源對象ConnectionHolder

spring中一次事務(wù)的完成通常都是默認(rèn)在當(dāng)前線程內(nèi)完成的,又因為一次事務(wù)的執(zhí)行過程中,涉及到對當(dāng)前數(shù)據(jù)庫連接Connection的操作,因此為了避免將Connection在事務(wù)執(zhí)行過程中來回傳遞,我們可以將Connextion綁定到當(dāng)前事務(wù)執(zhí)行線程對應(yīng)的ThreadLocalMap內(nèi)部,順便還可以將一些其他屬性也放入其中進行保存,在Spring中,負(fù)責(zé)保存這些ThreadLocal屬性的實現(xiàn)類由TransactionSynchronizationManager承擔(dān)。

TransactionSynchronizationManager類內(nèi)部默認(rèn)提供了下面六個ThreadLocal屬性,分別保存當(dāng)前線程對應(yīng)的不同事務(wù)資源:

   //保存當(dāng)前事務(wù)關(guān)聯(lián)的資源--默認(rèn)只會在新建事務(wù)的時候保存當(dāng)前獲取到的DataSource和當(dāng)前事務(wù)對應(yīng)Connection的映射關(guān)系--當(dāng)然這里Connection被包裝為了ConnectionHolder
	private static final ThreadLocal<Map<Object, Object>> resources =
			new NamedThreadLocal<>("Transactional resources");
    //事務(wù)監(jiān)聽者--在事務(wù)執(zhí)行到某個階段的過程中,會去回調(diào)監(jiān)聽者對應(yīng)的回調(diào)接口(典型觀察者模式的應(yīng)用)---默認(rèn)為空集合
	private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
			new NamedThreadLocal<>("Transaction synchronizations");
   //見名知意: 存放當(dāng)前事務(wù)名字
	private static final ThreadLocal<String> currentTransactionName =
			new NamedThreadLocal<>("Current transaction name");
   //見名知意: 存放當(dāng)前事務(wù)是否是只讀事務(wù)
	private static final ThreadLocal<Boolean> currentTransactionReadOnly =
			new NamedThreadLocal<>("Current transaction read-only status");
   //見名知意: 存放當(dāng)前事務(wù)的隔離級別
	private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
			new NamedThreadLocal<>("Current transaction isolation level");
   //見名知意: 存放當(dāng)前事務(wù)是否處于激活狀態(tài)
	private static final ThreadLocal<Boolean> actualTransactionActive =
			new NamedThreadLocal<>("Actual transaction active");

那么上面拋出的異常的原因也就很清楚了,無法在main線程找到當(dāng)前事務(wù)對應(yīng)的資源,原因如下:

主線程為:http-nio-5566-exec-2
子線程1為:ForkJoinPool.commonPool-worker-1
子線程2為:ForkJoinPool.commonPool-worker-2

開啟新事務(wù)時,事務(wù)相關(guān)資源都被綁定到了http-nio-5566-exec-2線程對應(yīng)的threadLocalMap內(nèi)部,而當(dāng)執(zhí)行事務(wù)提交代碼時,commit內(nèi)部需要從TransactionSynchronizationManager中獲取當(dāng)前事務(wù)的資源,顯然我們無法從main線程對應(yīng)的threadLocalMap中獲取到對應(yīng)的事務(wù)資源,這就是異常拋出的原因。

下面介紹一種可用的多線程事務(wù)回滾方式,但是對編程順序有要求,小伙伴們可以按需使用。

首先提供一個多線程事務(wù)管理類:

import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 多線程事務(wù)管理器
 * @Title: MultiThreadingTransactionManager
 * @Description: TODO
 * @author: hulei
 * @date: 2023/8/7 11:36
 * @Version: 1.0
 */
@Slf4j
public class MultiThreadingTransactionManager {

    /**
     * 事務(wù)管理器
     */
    private final PlatformTransactionManager transactionManager;

    /**
     * 超時時間
     */
    private final long timeout;

    /**
     * 時間單位
     */
    private final TimeUnit unit;

    /**
     * 一階段門閂,(第一階段的準(zhǔn)備階段),當(dāng)所有子線程準(zhǔn)備完成時(除“提交/回滾”操作以外的工作都完成),countDownLatch的值為0
     */
    private CountDownLatch oneStageLatch = null;

    /**
     * 二階段門閂,(第二階段的執(zhí)行執(zhí)行),主線程將不再等待子線程執(zhí)行,直接判定總的任務(wù)執(zhí)行失敗,執(zhí)行第二階段讓等待確認(rèn)的線程進行回滾
     */
    private final CountDownLatch twoStageLatch = new CountDownLatch(1);

    /**
     * 是否提交事務(wù),默認(rèn)是true(當(dāng)任一線程發(fā)生異常時,isSubmit會被設(shè)置為false,即回滾事務(wù))
     */
    private final AtomicBoolean isSubmit = new AtomicBoolean(true);

    /**
     * 構(gòu)造方法
     *
     * @param transactionManager 事務(wù)管理器
     * @param timeout            超時時間
     * @param unit               時間單位
     */
    public MultiThreadingTransactionManager(PlatformTransactionManager transactionManager, long timeout, TimeUnit unit) {
        this.transactionManager = transactionManager;
        this.timeout = timeout;
        this.unit = unit;
    }

    /**
     * 線程池方式執(zhí)行任務(wù),可保證線程間的事務(wù)一致性
     *
     * @param runnableList 任務(wù)列表
     * @param executor     線程池
     */
    public void execute(List<Runnable> runnableList, ExecutorService executor) {
        // 排除null值
        runnableList.removeAll(Collections.singleton(null));
        // 屬性初始化
        innit(runnableList.size());
        // 遍歷任務(wù)列表并放入線程池
        for (Runnable runnable : runnableList) {
            // 創(chuàng)建線程
            Thread thread = new Thread(() -> {
                // 如果別的線程執(zhí)行失敗,則該任務(wù)就不需要再執(zhí)行了
                if (!isSubmit.get()) {
                    log.info("當(dāng)前子線程執(zhí)行中止,因為線程事務(wù)中有子線程執(zhí)行失敗");
                    oneStageLatch.countDown();
                    return;
                }
                // 開啟事務(wù)
                TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());
                try {
                    // 執(zhí)行業(yè)務(wù)邏輯
                    runnable.run();
                } catch (Exception e) {
                    // 執(zhí)行體發(fā)生異常,設(shè)置回滾
                    isSubmit.set(false);
                    log.error("線程{}:業(yè)務(wù)發(fā)生異常,執(zhí)行體:{}", Thread.currentThread().getName(), runnable);
                }
                // 計數(shù)器減一
                oneStageLatch.countDown();
                try {
                    //等待所有線程任務(wù)完成,監(jiān)控是否有異常,有則統(tǒng)一回滾
                    twoStageLatch.await();
                    // 根據(jù)isSubmit值判斷事務(wù)是否提交,可能是子線程出現(xiàn)異常,也有可能是子線程執(zhí)行超時
                    if (isSubmit.get()) {
                        // 提交
                        transactionManager.commit(transactionStatus);
                        log.info("線程{}:事務(wù)提交成功,執(zhí)行體:{}", Thread.currentThread().getName(), runnable);
                    } else {
                        // 回滾
                        transactionManager.rollback(transactionStatus);
                        log.info("線程{}:事務(wù)回滾成功,執(zhí)行體:{}", Thread.currentThread().getName(), runnable);
                    }
                } catch (InterruptedException e) {
                    log.error("子線程拋出異常:{}",e.getMessage());
                }
            });
            executor.execute(thread);
        }

        //主線程擔(dān)任協(xié)調(diào)者,當(dāng)?shù)谝浑A段所有參與者準(zhǔn)備完成,oneStageLatch的計數(shù)為0
        //主線程發(fā)起第二階段,執(zhí)行階段(提交或回滾)
        try {
            // 主線程等待所有線程執(zhí)行完成,超時時間設(shè)置為五秒,超出等待時間則返回false,計數(shù)為0返回true
            boolean timeOutFlag = oneStageLatch.await(timeout, unit);
            long count = oneStageLatch.getCount();
            // 主線程等待超時,子線程可能發(fā)生長時間阻塞,死鎖
            if (count > 0 || !timeOutFlag) {
                // 設(shè)置為回滾
                isSubmit.set(false);
                log.info("主線線程等待超時,任務(wù)即將全部回滾");
                throw new RuntimeException("主線線程等待超時,任務(wù)即將全部回滾");
            }
            twoStageLatch.countDown();
        } catch (InterruptedException e) {
            log.error(e.getMessage());
            throw new RuntimeException(e.getMessage());
        }
        // 返回結(jié)果,是否執(zhí)行成功,事務(wù)提交即為執(zhí)行成功,事務(wù)回滾即為執(zhí)行失敗
        boolean flag = isSubmit.get();
        if(!flag){
            log.info("有線程發(fā)生異常,事務(wù)全部回滾");
            throw new RuntimeException("有線程發(fā)生異常,數(shù)據(jù)全部回滾");
        }else{
            log.info("主線程和子線程執(zhí)行無異常,事務(wù)全部提交");
        }
        executor.shutdown();
    }

    /**
     * 初始化屬性
     *
     * @param size 任務(wù)數(shù)量
     */
    private void innit(int size) {
        oneStageLatch = new CountDownLatch(size);
    }
}

?看下調(diào)用代碼示例

import com.hulei.studyproject.entity.User;
import com.hulei.studyproject.threadpool.ThreadPoolUtil;
import com.hulei.studyproject.transaction.MultiThreadingTransactionManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Transactional;

import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 測試多線程事務(wù)回滾
 * @Title: UserServiceImpl
 * @Description: TODO
 * @author: hulei
 * @date: 2023/7/31 17:41
 * @Version: 1.0
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class UserServiceImpl implements IUserService{

    private final JdbcTemplate jdbcTemplate;

    private final PlatformTransactionManager platformTransactionManager;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void insert() {
        System.out.println("主線程為:"+Thread.currentThread().getName());
        List<User> list = new ArrayList<>(){{
            add(new User("1","張三"));
        }};
        String sql = "insert into user(id,name) values (?,?)";
        jdbcTemplate.batchUpdate(sql,list, list.size(), (ps,d)->{
            ps.setString(1, d.getId());
            ps.setString(2,d.getName());
        });
        List<Runnable> runnableList = getRunnables();
        MultiThreadingTransactionManager multiThreadingTransactionManager = new MultiThreadingTransactionManager(platformTransactionManager,5, TimeUnit.SECONDS);
        ThreadPoolExecutor executor = ThreadPoolUtil.getThreadPool();
        multiThreadingTransactionManager.execute(runnableList,executor);
    }

    private List<Runnable> getRunnables() {
        List<Runnable> runnableList = new ArrayList<>();
        runnableList.add(()->{
            System.out.println("子線程1為:"+Thread.currentThread().getName());
            List<User> listOne = new ArrayList<>(){{
                add(new User("2","李四"));
            }};
            String sqlOne = "insert into user(id,name) values (?,?)";
            jdbcTemplate.batchUpdate(sqlOne,listOne, listOne.size(), (ps,d)->{
                ps.setString(1, d.getId());
                ps.setString(2,d.getName());
            });
            int a = 10/0;
        });
        runnableList.add(()->{
            System.out.println("子線程2為:"+Thread.currentThread().getName());
            List<User> listTwo = new ArrayList<>(){{
                add(new User("3","王五"));
            }};
            String sqlTwo = "insert into user(id,name) values (?,?)";
            jdbcTemplate.batchUpdate(sqlTwo,listTwo, listTwo.size(), (ps,d)->{
                ps.setString(1, d.getId());
                ps.setString(2,d.getName());
            });
        });
        return runnableList;
    }
}

我們在其中一個子線程處手工寫了一個會拋出異常的代碼

事務(wù)中使用compleablefuture,java,開發(fā)語言

?執(zhí)行結(jié)果如下:

事務(wù)中使用compleablefuture,java,開發(fā)語言

?可以看到控制臺報錯,數(shù)據(jù)庫執(zhí)行結(jié)果如下:

事務(wù)中使用compleablefuture,java,開發(fā)語言

主線程和子線程數(shù)據(jù)均未生成

把異常代碼注釋掉

事務(wù)中使用compleablefuture,java,開發(fā)語言執(zhí)行結(jié)果如下

事務(wù)中使用compleablefuture,java,開發(fā)語言

?無異常拋出,數(shù)據(jù)庫結(jié)果如下:

事務(wù)中使用compleablefuture,java,開發(fā)語言

?可以看到子線程和主線程操作的數(shù)據(jù)均已回滾。

但是以上方法有一定局限性,即主線程如果再子線程執(zhí)行后再拋出異常,則子線程無法回滾了,所以要求邏輯寫在子線程執(zhí)行之前

事務(wù)中使用compleablefuture,java,開發(fā)語言

?2處的代碼必須放在方法最后寫。文章來源地址http://www.zghlxwxcb.cn/news/detail-815437.html

到了這里,關(guān)于CompletableFuture異步編程事務(wù)及多數(shù)據(jù)源配置詳解(含gitee源碼)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 異步編程 - 06 基于JDK中的Future實現(xiàn)異步編程(中)_CompletableFuture源碼解析

    異步編程 - 06 基于JDK中的Future實現(xiàn)異步編程(中)_CompletableFuture源碼解析

    CompletableFuture實現(xiàn)了CompletionStage接口 。 1)一個CompletionStage代表著一個異步計算節(jié)點,當(dāng)另外一個CompletionStage計算節(jié)點完成后,當(dāng)前CompletionStage會執(zhí)行或者計算一個值;一個節(jié)點在計算終止時完成,可能反過來觸發(fā)其他依賴其結(jié)果的節(jié)點開始計算。 2)一個節(jié)點(CompletionStag

    2024年02月09日
    瀏覽(22)
  • 并發(fā)編程 | 從Future到CompletableFuture - 簡化 Java 中的異步編程

    在并發(fā)編程中,我們經(jīng)常需要處理多線程的任務(wù),這些任務(wù)往往具有依賴性,異步性,且需要在所有任務(wù)完成后獲取結(jié)果。Java 8 引入了 CompletableFuture 類,它帶來了一種新的編程模式,讓我們能夠以函數(shù)式編程的方式處理并發(fā)任務(wù),顯著提升了代碼的可讀性和簡潔性。 在這篇

    2024年02月13日
    瀏覽(27)
  • 從 Future 到 CompletableFuture:簡化 Java 中的異步編程

    在并發(fā)編程中,我們經(jīng)常需要處理多線程的任務(wù),這些任務(wù)往往具有依賴性,異步性,且需要在所有任務(wù)完成后獲取結(jié)果。Java 8 引入了 CompletableFuture 類,它帶來了一種新的編程模式,讓我們能夠以函數(shù)式編程的方式處理并發(fā)任務(wù),顯著提升了代碼的可讀性和簡潔性。 在這篇

    2024年02月11日
    瀏覽(20)
  • CompletableFuture與線程池:Java 8中的高效異步編程搭配

    摘要:在Java 8中,CompletableFuture和線程池的結(jié)合使用為程序員提供了一種高效、靈活的異步編程解決方案。本文將深入探討CompletableFuture和線程池結(jié)合使用的優(yōu)勢、原理及實際應(yīng)用案例,幫助讀者更好地理解并掌握這一技術(shù)。 隨著多核處理器的普及,應(yīng)用程序的性能和響應(yīng)能

    2024年02月07日
    瀏覽(22)
  • Java學(xué)習(xí)筆記-day06-響應(yīng)式編程Reactor與Callback、CompletableFuture三種形式異步編碼對比

    Reactor 是一個基于Reactive Streams規(guī)范的響應(yīng)式編程框架。它提供了一組用于構(gòu)建異步、事件驅(qū)動、響應(yīng)式應(yīng)用程序的工具和庫。Reactor 的核心是 Flux (表示一個包含零到多個元素的異步序列)和 Mono 表示一個包含零或一個元素的異步序列)。 Reactor 通過提供響應(yīng)式的操作符,如

    2024年02月03日
    瀏覽(38)
  • 千云物流- 多數(shù)據(jù)源事務(wù)管理

    Spring只是個容器,因此它并不做任何事務(wù)的具體實現(xiàn)。他只是提供了事務(wù)管理的接口PlatformTransactionManager,具體內(nèi)容由就由各個事務(wù)管理器來實現(xiàn)。 DataSourceTransactionManager:位于org.springframework.jdbc.datasource包中,數(shù)據(jù)源事務(wù)管理器,提供對單個javax.sql.DataSource事務(wù)管理,用于S

    2024年02月02日
    瀏覽(20)
  • Spring Boot 多數(shù)據(jù)源及事務(wù)解決方案

    Spring Boot 多數(shù)據(jù)源及事務(wù)解決方案

    一個主庫和N個應(yīng)用庫的數(shù)據(jù)源,并且會同時操作主庫和應(yīng)用庫的數(shù)據(jù),需要解決以下兩個問題: 如何動態(tài)管理多個數(shù)據(jù)源以及切換? 如何保證多數(shù)據(jù)源場景下的數(shù)據(jù)一致性(事務(wù))? 本文主要探討這兩個問題的解決方案,希望能對讀者有一定的啟發(fā)。 通過擴展Spring提供的抽象

    2024年02月10日
    瀏覽(20)
  • 解決多數(shù)據(jù)源的事務(wù)問題 - 基于springboot--mybatis

    解決多數(shù)據(jù)源的事務(wù)問題 - 基于springboot--mybatis

    在Spring Boot和MyBatis中,我們有時需要在方法中同時使用兩個不同的數(shù)據(jù)庫,但使用 @Transactional 注解會變得復(fù)雜。這時我們可以用一種更靈活的方法來處理。 想象一下這樣的場景:我們有兩個數(shù)據(jù)庫,我們希望在一個方法中同時操作它們,但是普通的 @Transactional 注解變得不太

    2024年02月01日
    瀏覽(17)
  • Spring Boot多數(shù)據(jù)源事務(wù)@DSTransactional的使用

    Spring Boot 集成com.baomidou,引入dynamic-datasource依賴,實現(xiàn)多數(shù)據(jù)源,這里說下事務(wù)問題: 1、一個方法中使用同一個數(shù)據(jù)源; 2、一個方法中使用了多個數(shù)據(jù)源; 這里把dao、service列出來 1、dao層 ? 2、service層? spring boot實現(xiàn)多數(shù)據(jù)源:Spring Boot集成Druid實現(xiàn)多數(shù)據(jù)源的兩種方式_濤

    2024年02月11日
    瀏覽(17)
  • Spring | 基于SpringBoot的多數(shù)據(jù)源實戰(zhàn) - 使用seata實現(xiàn)多數(shù)據(jù)源的全局事務(wù)管理

    Spring | 基于SpringBoot的多數(shù)據(jù)源實戰(zhàn) - 使用seata實現(xiàn)多數(shù)據(jù)源的全局事務(wù)管理

    在軟件開發(fā)中, 多數(shù)據(jù)源 的應(yīng)用越來越普遍,特別是在 微服務(wù)架構(gòu) 和 業(yè)務(wù)模塊化 的場景下。多數(shù)據(jù)源能夠讓不同的業(yè)務(wù)模塊和微服務(wù)擁有各自獨立的數(shù)據(jù)存儲,大大提高了系統(tǒng)的靈活性和可維護性。本文將深入探討多數(shù)據(jù)源的配置和實施,以及在 Spring Boot 環(huán)境下,如何通

    2024年02月07日
    瀏覽(28)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包