前言
在多線程編碼過程中,我們會(huì)用到多線程來提升運(yùn)行效率。比如我們的Executors創(chuàng)建線程池,程序盡可能的壓榨CPU資源來提升我們程序吞吐量。但是過度的使用線程,也會(huì)將我們CPU資源榨干,從而讓我們系統(tǒng)不能正常的提供服務(wù)。故今天我們引入JUC并發(fā)包下面的semaphore并發(fā)類,該類可以同時(shí)允許定量線程執(zhí)行從而達(dá)到控制并發(fā)的目的。
Semaphore原理
Semaphore并發(fā)類提供了兩個(gè)核心方法:acquire()方法和release()方法。acquire()方法表示獲取一個(gè)許可,如果沒有則等待,release()方法則是釋放對(duì)應(yīng)的許可。Semaphore維護(hù)了當(dāng)前訪問的個(gè)數(shù),通過提供同步機(jī)制來控制同時(shí)訪問的個(gè)數(shù)。
Semaphore源碼解析
內(nèi)部繼承AQS保證同步
Semaphore 與AQS關(guān)系圖:
進(jìn)入java.util.concurrent 包下 Semaphore 并發(fā)類查看源碼:
//繼承AQS同步阻塞隊(duì)列來實(shí)現(xiàn)同步功能
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
如上所示,首先Semaphore并發(fā)內(nèi)部類繼承AbstractQueuedSynchronizer 同步阻塞類,所以可以得出Semaphore是通過AQS機(jī)制來保證同步。AQS不明白的同學(xué)可以看我之前的博文,大概就是內(nèi)部一個(gè)state狀態(tài),獲取到資源 state 就加一,釋放資源 state 就減一,當(dāng) state == 0 的時(shí)候表示阻塞隊(duì)列中的其他線程可以獲取該資源。
繼續(xù)查看源碼:
/**
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
* first-in first-out granting of permits under contention,
* else {@code false}
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
如源碼所示,Semaphore構(gòu)造方法默認(rèn)提供非公平鎖同步構(gòu)造方法,也提供了一個(gè)用戶自定義是否公平鎖的構(gòu)造方法。是否公平鎖同步直接影響阻塞隊(duì)列中的哪個(gè)線程可以獲取資源,公平鎖是按照阻塞順序獲取資源,非公平鎖是多個(gè)線程爭(zhēng)搶資源。當(dāng)然構(gòu)造方法必須傳入并發(fā)許可的總數(shù),這個(gè)總數(shù)直接影響后續(xù)我們可以同時(shí)獲取多少個(gè)許可。
acquire獲取許可
查看Semaphore 獲取許可的源碼:
//是否可以獲取許可
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
//是否可以獲取許可,并提供超時(shí)獲取機(jī)制
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//是否可以獲取多個(gè)許可,并提供超時(shí)獲取機(jī)制
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
//提供一個(gè)無參獲取許可方法,默認(rèn)獲取一個(gè)許可
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//不間斷的獲取許可,如果獲取不到就阻塞
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
//不間斷的獲取多個(gè)許可,如果獲取不到就阻塞
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
如源碼所示,我獲取許可有很多的方法。有是否可以獲取許可,超時(shí)是否可以獲取許可。當(dāng)然還有獲取許可的方法,以及阻塞獲取許可的多個(gè)方法,這些方法本質(zhì)上都是調(diào)用父AQS中的獲取資源許可的方法,同學(xué)們可以選擇自己適用的方法進(jìn)行調(diào)用。
release釋放許可
查看Semaphore 釋放許可的源碼:
//默認(rèn)釋放一個(gè)許可
public void release() {
sync.releaseShared(1);
}
//同時(shí)釋放多個(gè)許可
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
//調(diào)用父AQS的釋放資源的方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
如源碼所示,Semaphore提供了多個(gè)釋放許可的方法,我們可以根據(jù)實(shí)際需要選擇釋放許可的方法。
值得注意的是release() 方法調(diào)用了自己內(nèi)部類Sync的釋放資源方法,而Sync又是繼承AQS阻塞隊(duì)列并調(diào)用了父類doReleaseShared() 釋放資源的方法。
實(shí)戰(zhàn)演示
上面博文我們介紹了Semaphore并發(fā)類是一個(gè)同步類,它繼承了AQS阻塞隊(duì)列。Semaphore主要提供了acquire() 獲取許可與release() 釋放許可的方法,通過這兩個(gè)方法我們可以實(shí)現(xiàn)線程并發(fā)數(shù)目的功能。下面我們簡(jiǎn)單模擬一下限制并發(fā)數(shù)目。
1、Semaphore測(cè)試類
測(cè)試類提供一個(gè)定量5的線程池,用countdownlatch 讓主線程等待子線程執(zhí)行完成,Semaphore保證每次只有3個(gè)線程執(zhí)行。為了保證測(cè)試結(jié)果可以追溯性,我們?cè)跇I(yè)務(wù)邏輯中讓線程睡眠3s。
/**
* Semaphore test
* @author senfel
* @date 2023/4/6 11:38
* @return
*/
@SpringBootTest
class ConcurrentApplicationTests {
/**
* 日期格式
*/
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
@Test
void testSemaphore() throws Exception {
//創(chuàng)建線程為5的線程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
//同時(shí)運(yùn)行3個(gè)線程運(yùn)行
Semaphore semaphore = new Semaphore(3);
//等待執(zhí)行完成
CountDownLatch countDownLatch = new CountDownLatch(10);
for(int i = 0;i<10;i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try{
//獲取許可
semaphore.acquire();
//業(yè)務(wù)邏輯
executeFun();
//釋放許可
semaphore.release();
countDownLatch.countDown();
}catch (Exception e){
e.printStackTrace();
}
}
});
}
countDownLatch.await();
System.err.println("執(zhí)行完成");
executorService.shutdown();
}
/**
* 執(zhí)行業(yè)務(wù)方法
* @author senfel
* @date 2023/4/6 11:13
* @return void
*/
private void executeFun() {
try {
String startTime = formatter.format(LocalDateTime.now());
Thread.sleep(3000);
System.err.println("當(dāng)前執(zhí)行線程:"+Thread.currentThread().getName()+",開始時(shí)間:"+startTime+",結(jié)束時(shí)間"+formatter.format(LocalDateTime.now()));
} catch (Exception e) {
e.printStackTrace();
}
}
}
2、執(zhí)行測(cè)試方法
執(zhí)行testSemaphore() 方法,我們可以在控制臺(tái)看到如下結(jié)果:
當(dāng)前執(zhí)行線程:pool-1-thread-1,開始時(shí)間:11:37:00,結(jié)束時(shí)間11:37:03
當(dāng)前執(zhí)行線程:pool-1-thread-3,開始時(shí)間:11:37:00,結(jié)束時(shí)間11:37:03
當(dāng)前執(zhí)行線程:pool-1-thread-2,開始時(shí)間:11:37:00,結(jié)束時(shí)間11:37:03
當(dāng)前執(zhí)行線程:pool-1-thread-2,開始時(shí)間:11:37:03,結(jié)束時(shí)間11:37:06
當(dāng)前執(zhí)行線程:pool-1-thread-1,開始時(shí)間:11:37:03,結(jié)束時(shí)間11:37:06
當(dāng)前執(zhí)行線程:pool-1-thread-3,開始時(shí)間:11:37:03,結(jié)束時(shí)間11:37:06
當(dāng)前執(zhí)行線程:pool-1-thread-1,開始時(shí)間:11:37:06,結(jié)束時(shí)間11:37:09
當(dāng)前執(zhí)行線程:pool-1-thread-4,開始時(shí)間:11:37:06,結(jié)束時(shí)間11:37:09
當(dāng)前執(zhí)行線程:pool-1-thread-2,開始時(shí)間:11:37:06,結(jié)束時(shí)間11:37:09
當(dāng)前執(zhí)行線程:pool-1-thread-5,開始時(shí)間:11:37:09,結(jié)束時(shí)間11:37:12
執(zhí)行完成
根據(jù)執(zhí)行結(jié)果我們可以發(fā)現(xiàn)每3s有3個(gè)線程執(zhí)行并輸出,得出結(jié)論Semaphore可以保證每次只能同時(shí)3個(gè)線程執(zhí)行!?。?!文章來源:http://www.zghlxwxcb.cn/news/detail-430657.html
總結(jié)
多線程控制并發(fā)數(shù)目工具類Semaphore內(nèi)部繼承AQS抽象阻塞隊(duì)列,并繼承了其同步獲取、釋放資源的方法。Semaphore提供了acquire()獲取許可、release()釋放許可方法,底層當(dāng)然調(diào)用的是AQS內(nèi)部方法來滿足同步以保證限制并發(fā)線程運(yùn)行數(shù)目。文章來源地址http://www.zghlxwxcb.cn/news/detail-430657.html
到了這里,關(guān)于多線程控制并發(fā)數(shù)目工具類Semaphore的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!