Semaphore:如何快速實(shí)現(xiàn)一個(gè)限流器?
信號(hào)量模型
- 信號(hào)量模型還是很簡(jiǎn)單的,可以簡(jiǎn)單概括為:一個(gè)計(jì)數(shù)器,一個(gè)等待隊(duì)列,三個(gè)方法。
- 在信號(hào)量模型里,計(jì)數(shù)器和等待隊(duì)列對(duì)外是透明的,所以只能通過(guò)信號(hào)量模型提供的三個(gè)方法來(lái)訪問(wèn)它們,這三個(gè)方法分別是:init()、down() 和 up()。
- init():設(shè)置計(jì)數(shù)器的初始值。
- down():計(jì)數(shù)器的值減 1;如果此時(shí)計(jì)數(shù)器的值小于 0,則當(dāng)前線程將被阻塞,否則當(dāng)前線程可以繼續(xù)執(zhí)行。
- up():計(jì)數(shù)器的值加 1;如果此時(shí)計(jì)數(shù)器的值小于或者等于 0,則喚醒等待隊(duì)列中的一個(gè)線程,并將其從等待隊(duì)列中移除。
- 在 Java SDK 里面,信號(hào)量模型是由 java.util.concurrent.Semaphore 實(shí)現(xiàn)的,Semaphore 這個(gè)類能夠保證這三個(gè)方法都是原子操作。
- 在 Java SDK 并發(fā)包里,down() 和 up() 對(duì)應(yīng)的則是 acquire() 和 release()。
如何使用信號(hào)量
快速實(shí)現(xiàn)一個(gè)限流器
- 實(shí)現(xiàn)一個(gè)互斥鎖,僅僅是 Semaphore 的部分功能,Semaphore 還有?個(gè)功能是 Lock 不容易實(shí)現(xiàn)的,那就是:Semaphore 可以允許多個(gè)線程訪問(wèn)一個(gè)臨界區(qū)。
- 比較常見(jiàn)的需求就是我們工作中遇到的各種池化資源,例如連接池、對(duì)象池、線程池等等。
- 其中,數(shù)據(jù)庫(kù)連接池,在同一時(shí)刻,一定是允許多個(gè)線程同時(shí)使用連接池的,當(dāng)然,每個(gè)連接在被釋放前,是不允許其他線程使用的。
- 所謂對(duì)象池,指的是一次性創(chuàng)建出 N 個(gè)對(duì)象,之后所有的線程重復(fù)利用這 N 個(gè)對(duì)象,當(dāng)然對(duì)象在被釋放前,也是不允許其他線程使用的。
- 對(duì)象池,可以用 List 保存實(shí)例對(duì)象,這個(gè)很簡(jiǎn)單。
- 但關(guān)鍵是限流器的設(shè)計(jì),這里的限流,指的是不允許多于 N 個(gè)線程同時(shí)進(jìn)入臨界區(qū)。
-
那如何快速實(shí)現(xiàn)?個(gè)這樣的限流器呢?如果我們把計(jì)數(shù)器的值設(shè)置成對(duì)象池里對(duì)象的個(gè)數(shù) N,就能完美解決對(duì)象池的限流問(wèn)題了。
import java.util.List; import java.util.Vector; import java.util.concurrent.Semaphore; import java.util.function.Function; public class ObjPool<T, R> { final List<T> pool; // ?信號(hào)量實(shí)現(xiàn)限流器 final Semaphore semaphore; // 構(gòu)造函數(shù) ObjPool(int size, T t) { pool = new Vector<T>(){}; for (int i = 0; i < size; i++) { pool.add(t); } semaphore = new Semaphore(size); } // 利?對(duì)象池的對(duì)象,調(diào)? func // function 的作用是轉(zhuǎn)換,將一個(gè)值轉(zhuǎn)為另外一個(gè)值 R exec(Function<T, R> function) throws InterruptedException { T t = null; semaphore.acquire(); try { t = pool.remove(0); return function.apply(t); } finally { pool.add(t); semaphore.release(); } } public static void main(String[] args) throws InterruptedException { // 創(chuàng)建對(duì)象池 ObjPool<Long, String> objPool = new ObjPool<Long, String>(10, 2L); // 通過(guò)對(duì)象池獲取 t,之后執(zhí)? objPool.exec(t -> { System.out.println(t); return t.toString(); }); } }
- 我們用一個(gè) List來(lái)保存對(duì)象實(shí)例,用 Semaphore 實(shí)現(xiàn)限流器。
- 關(guān)鍵的代碼是 ObjPool 里面的 exec() 方法,這個(gè)方法里面實(shí)現(xiàn)了限流的功能。
- 在這個(gè)方法里面,我們首先調(diào)用 acquire() 方法(與之匹配的是在 finally 里面調(diào)用 release() 方法),假設(shè)對(duì)象池的大小是 10,信號(hào)量的計(jì)數(shù)器初始化為 10,那么前 10 個(gè)線程調(diào)用 acquire() 方法,都能繼續(xù)執(zhí)行,而其他線程則會(huì)阻塞在 acquire() 方法上。
- 我們?yōu)槊總€(gè)線程分配了一個(gè)對(duì)象 t(這個(gè)分配工作是通過(guò) pool.remove(0) 實(shí)現(xiàn)的),分配完之后會(huì)執(zhí)行一個(gè)回調(diào)函數(shù) func,而函數(shù)的參數(shù)正是前面分配的對(duì)象 t ;執(zhí)行完回調(diào)函數(shù)之后,它們就會(huì)釋放對(duì)象(這個(gè)釋放工作是通過(guò) pool.add(t) 實(shí)現(xiàn)的),同時(shí)調(diào)用 release() 方法來(lái)更新信號(hào)量的計(jì)數(shù)器。
- 如果此時(shí)信號(hào)量里計(jì)數(shù)器的值小于等于 0,那么說(shuō)明有線程在等待,此時(shí)會(huì)自動(dòng)喚醒等待的線程。
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-472092.html
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-472092.html
到了這里,關(guān)于《Java并發(fā)編程實(shí)戰(zhàn)》課程筆記(九)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!