目錄
前言
一、springboot多線程(聲明式)的使用方法?
二、自定義注解實現(xiàn)多線程事務控制
1.自定義注解
2.AOP內(nèi)容
3.注解使用Demo
前言
本文是基于springboot的@Async注解開啟多線程, 并通過自定義注解和AOP實現(xiàn)的多線程事務, 避免繁瑣的手動提交/回滾事務? (CV即用, 參數(shù)齊全, 無需配置)
一、springboot多線程(聲明式)的使用方法?
1、springboot提供了注解@Async來使用線程池, 具體使用方法如下:
(1) 在啟動類(配置類)添加@EnableAsync來開啟線程池
(2) 在需要開啟子線程的方法上添加注解@Async
注意: 框架默認 ----->? ?來一個請求開啟一個線程,在高并發(fā)下容易內(nèi)存溢出
所以使用時需要配置自定義線程池, 如下:
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
@Bean("threadPoolTaskExecutor")//自定義線程池名稱
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//線程池創(chuàng)建的核心線程數(shù),線程池維護線程的最少數(shù)量,即使沒有任務需要執(zhí)行,也會一直存活
executor.setCorePoolSize(16);
//如果設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉
//executor.setAllowCoreThreadTimeOut(true);
//阻塞隊列 當核心線程數(shù)達到最大時,新任務會放在隊列中排隊等待執(zhí)行
executor.setQueueCapacity(124);
//最大線程池數(shù)量,當線程數(shù)>=corePoolSize,且任務隊列已滿時。線程池會創(chuàng)建新線程來處理任務
//任務隊列已滿時, 且當線程數(shù)=maxPoolSize,,線程池會拒絕處理任務而拋出異常
executor.setMaxPoolSize(64);
//當線程空閑時間達到keepAliveTime時,線程會退出,直到線程數(shù)量=corePoolSize
//允許線程空閑時間30秒,當maxPoolSize的線程在空閑時間到達的時候銷毀
//如果allowCoreThreadTimeout=true,則會直到線程數(shù)量=0
executor.setKeepAliveSeconds(30);
//spring 提供的 ThreadPoolTaskExecutor 線程池,是有setThreadNamePrefix() 方法的。
//jdk 提供的ThreadPoolExecutor 線程池是沒有 setThreadNamePrefix() 方法的
executor.setThreadNamePrefix("自定義線程池-");
// rejection-policy:拒絕策略:當線程數(shù)已經(jīng)達到maxSize的時候,如何處理新任務
// CallerRunsPolicy():交由調(diào)用方線程運行,比如 main 線程;如果添加到線程池失敗,那么主線程會自己去執(zhí)行該任務,不會等待線程池中的線程去執(zhí)行, (個人推薦)
// AbortPolicy():該策略是線程池的默認策略,如果線程池隊列滿了丟掉這個任務并且拋出RejectedExecutionException異常。
// DiscardPolicy():如果線程池隊列滿了,會直接丟掉這個任務并且不會有任何異常
// DiscardOldestPolicy():丟棄隊列中最老的任務,隊列滿了,會將最早進入隊列的任務刪掉騰出空間,再嘗試加入隊列
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//設置線程池關閉的時候等待所有任務都完成再繼續(xù)銷毀其他的Bean,這樣這些異步任務的銷毀就會先于Redis線程池的銷毀
executor.setWaitForTasksToCompleteOnShutdown(true);
//設置線程池中任務的等待時間,如果超過這個時候還沒有銷毀就強制銷毀,以確保應用最后能夠被關閉,而不是阻塞住。
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}
?開啟子線程方法:? 在需要開啟線程的方法上添加 注解@Async("threadPoolTaskExecutor")即可, 其中注解中的參數(shù)為自定義線程池的名稱.
二、自定義注解實現(xiàn)多線程事務控制
1.自定義注解
本文是使用了兩個注解共同作用實現(xiàn)的, 主線程當做協(xié)調(diào)者,各子線程作為參與者
package com.example.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 多線程事務注解: 主事務
*
* @author zlj
* @since 2022/11/3
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MainTransaction {
int value();//子線程數(shù)量
}
package com.example.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 多線程事務注解: 子事務
*
* @author zlj
* @since 2022/11/3
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface SonTransaction {
String value() default "";
}
解釋:
兩個注解都是用在方法上的.須配合@Transactional(rollbackFor = Exception.class)一起使用
@MainTransaction注解 用在調(diào)用方, 其參數(shù)為必填, 參數(shù)值為本方法中調(diào)用的方法開啟的線程數(shù), 如: 在這個方法中調(diào)用的方法中有2個方法用@Async注解開啟了子線程, 則參數(shù)為@MainTransaction(2), 另外如果未使用@MainTransaction注解, 則直接已無多線程事務執(zhí)行(不影響方法的單線程事務)
@SonTransaction注解 用在被調(diào)用方(開啟線程的方法), 無需傳入?yún)?shù)
2.AOP內(nèi)容
代碼如下:
package com.example.aop;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.example.anno.MainTransaction;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 多線程事務
*
* @author zlj
* @since 2022/11/3
*/
@Aspect
@Component
public class TransactionAop {
//用來存儲各線程計數(shù)器數(shù)據(jù)(每次執(zhí)行后會從map中刪除)
private static final Map<String, Object> map = new HashMap<>();
@Resource
private PlatformTransactionManager transactionManager;
@Around("@annotation(mainTransaction)")
public void mainIntercept(ProceedingJoinPoint joinPoint, MainTransaction mainTransaction) throws Throwable {
//當前線程名稱
Thread thread = Thread.currentThread();
String threadName = thread.getName();
//初始化計數(shù)器
CountDownLatch mainDownLatch = new CountDownLatch(1);
CountDownLatch sonDownLatch = new CountDownLatch(mainTransaction.value());//@MainTransaction注解中的參數(shù), 為子線程的數(shù)量
// 用來記錄子線程的運行狀態(tài),只要有一個失敗就變?yōu)閠rue
AtomicBoolean rollBackFlag = new AtomicBoolean(false);
// 用來存每個子線程的異常,把每個線程的自定義異常向vector的首位置插入,其余異常向末位置插入,避免線程不安全,所以使用vector代替list
Vector<Throwable> exceptionVector = new Vector<>();
map.put(threadName + "mainDownLatch", mainDownLatch);
map.put(threadName + "sonDownLatch", sonDownLatch);
map.put(threadName + "rollBackFlag", rollBackFlag);
map.put(threadName + "exceptionVector", exceptionVector);
try {
joinPoint.proceed();//執(zhí)行方法
} catch (Throwable e) {
exceptionVector.add(0, e);
rollBackFlag.set(true);//子線程回滾
mainDownLatch.countDown();//放行所有子線程
}
if (!rollBackFlag.get()) {
try {
// sonDownLatch等待,直到所有子線程執(zhí)行完插入操作,但此時還沒有提交事務
sonDownLatch.await();
mainDownLatch.countDown();// 根據(jù)rollBackFlag狀態(tài)放行子線程的await處,告知是回滾還是提交
} catch (Exception e) {
rollBackFlag.set(true);
exceptionVector.add(0, e);
}
}
if (CollectionUtils.isNotEmpty(exceptionVector)) {
map.remove(threadName + "mainDownLatch");
map.remove(threadName + "sonDownLatch");
map.remove(threadName + "rollBackFlag");
map.remove(threadName + "exceptionVector");
throw exceptionVector.get(0);
}
}
@Around("@annotation(com.huigu.common.anno.SonTransaction)")
public void sonIntercept(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
Thread thread = (Thread) args[args.length - 1];
String threadName = thread.getName();
CountDownLatch mainDownLatch = (CountDownLatch) map.get(threadName + "mainDownLatch");
if (mainDownLatch == null) {
//主事務未加注解時, 直接執(zhí)行子事務
joinPoint.proceed();//這里最好的方式是:交由上面的thread來調(diào)用此方法,但我沒有找尋到對應api,只能直接放棄事務, 歡迎大神來優(yōu)化, 留言分享
return;
}
CountDownLatch sonDownLatch = (CountDownLatch) map.get(threadName + "sonDownLatch");
AtomicBoolean rollBackFlag = (AtomicBoolean) map.get(threadName + "rollBackFlag");
Vector<Throwable> exceptionVector = (Vector<Throwable>) map.get(threadName + "exceptionVector");
//如果這時有一個子線程已經(jīng)出錯,那當前線程不需要執(zhí)行
if (rollBackFlag.get()) {
sonDownLatch.countDown();
return;
}
DefaultTransactionDefinition def = new DefaultTransactionDefinition();// 開啟事務
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 設置事務隔離級別
TransactionStatus status = transactionManager.getTransaction(def);
try {
joinPoint.proceed();//執(zhí)行方法
sonDownLatch.countDown();// 對sonDownLatch-1
mainDownLatch.await();// 如果mainDownLatch不是0,線程會在此阻塞,直到mainDownLatch變?yōu)?
// 如果能執(zhí)行到這一步說明所有子線程都已經(jīng)執(zhí)行完畢判斷如果atomicBoolean是true就回滾false就提交
if (rollBackFlag.get()) {
transactionManager.rollback(status);
} else {
transactionManager.commit(status);
}
} catch (Throwable e) {
exceptionVector.add(0, e);
// 回滾
transactionManager.rollback(status);
// 并把狀態(tài)設置為true
rollBackFlag.set(true);
mainDownLatch.countDown();
sonDownLatch.countDown();
}
}
}
擴展說明: CountDownLatch是什么?
一個同步輔助類
創(chuàng)建對象時: 用給定的數(shù)字初始化 CountDownLatch
countDown() 方法: 使計數(shù)減1
await() 方法: 阻塞當前線程, 直至當前計數(shù)到達零。
本文中:
用 計數(shù) 1 初始化的 mainDownLatch?當作一個簡單的開/關鎖存器,或入口:在通過調(diào)用 countDown() 的線程打開入口前,所有調(diào)用 await 的線程都一直在入口處等待。
用 子線程數(shù)量 初始化的 sonDownLatch?可以使一個線程在 N 個線程完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。
3、注解使用Demo
任務方法:文章來源:http://www.zghlxwxcb.cn/news/detail-408182.html
package com.example.demo.service;
import com.example.demo.anno.SonTransaction;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* @author zlj
* @since 2022/11/14
*/
@Service
public class SonService {
/**
* 參數(shù)說明: 以下4個方法參數(shù)和此相同
*
* @param args 業(yè)務中需要傳遞的參數(shù)
* @param thread 調(diào)用者的線程, 用于aop獲取參數(shù), 不建議以方法重寫的方式簡略此參數(shù),
* 在調(diào)用者方法中可以以此參數(shù)為標識計算子線程的個數(shù)作為注解參數(shù),避免線程參數(shù)計算錯誤導致鎖表
* 傳參時參數(shù)固定為: Thread.currentThread()
*/
@Transactional(rollbackFor = Exception.class)
@Async("threadPoolTaskExecutor")
@SonTransaction
public void sonMethod1(String args, Thread thread) {
System.out.println(args + "開啟了線程");
}
@Transactional(rollbackFor = Exception.class)
@Async("threadPoolTaskExecutor")
@SonTransaction
public void sonMethod2(String args1, String args2, Thread thread) {
System.out.println(args1 + "和" + args2 + "開啟了線程");
}
@Transactional(rollbackFor = Exception.class)
@Async("threadPoolTaskExecutor")
@SonTransaction
public void sonMethod3(String args, Thread thread) {
System.out.println(args + "開啟了線程");
}
//sonMethod4方法沒有使用線程池
@Transactional(rollbackFor = Exception.class)
public void sonMethod4(String args) {
System.out.println(args + "沒有開啟線程");
}
}
調(diào)用方:文章來源地址http://www.zghlxwxcb.cn/news/detail-408182.html
package com.example.demo.service;
import com.example.demo.anno.MainTransaction;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* @author zlj
* @since 2022/11/14
*/
@Service
public class MainService {
@Resource
private SonService sonService;
@MainTransaction(3)//調(diào)用的方法中sonMethod1/sonMethod2/sonMethod3使用@Async開啟了線程, 所以參數(shù)為: 3
@Transactional(rollbackFor = Exception.class)
public void test1() {
sonService.sonMethod1("路飛", Thread.currentThread());
sonService.sonMethod2("索隆", "山治", Thread.currentThread());
sonService.sonMethod3("娜美", Thread.currentThread());
sonService.sonMethod4("羅賓");
}
/*
* 有的業(yè)務中存在if的多種可能, 每一種走向調(diào)用的方法(開啟線程的方法)數(shù)量如果不同, 這時可以選擇放棄使用@MainTransaction注解避免鎖表
* 這時候如果發(fā)生異常會導致多線程不能同時回滾, 可根據(jù)業(yè)務自己權衡是否使用
*/
@Transactional(rollbackFor = Exception.class)
public void test2() {
sonService.sonMethod1("路飛", Thread.currentThread());
sonService.sonMethod2("索隆", "山治", Thread.currentThread());
sonService.sonMethod3("娜美", Thread.currentThread());
sonService.sonMethod4("羅賓");
}
}
到了這里,關于自定義注解實現(xiàn)springboot 多線程事務(基于@Async注解的多線程)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!