前言
大概有快兩周沒有發(fā)文了,這段時(shí)間不斷的充實(shí)自己,算算時(shí)間,也到了該收獲的時(shí)候,今天帶來一篇JUC詳解,但說實(shí)話,我也不敢自信它詳不詳細(xì)。JUC說白了就是多線程,學(xué)Java不久的人知道多線程,恐怕還不知道JUC是什么。在這篇博客中,博主將對JUC做一個(gè)自認(rèn)為比較全面的講解,為了方便大家理解,博主盡量用自己的語言給大家總結(jié),避免照搬各種資料里面生澀的文字讓大家不解其意,不出意外這篇博客會(huì)很長,目前還沒辦法估計(jì)具體多長,但總之應(yīng)該會(huì)比較長。在講解的同時(shí),博主也會(huì)做一些多線程的引申,引申到哪也不好說,只能邊寫邊想,遇到博主自己也不是太理解的,會(huì)標(biāo)注,大家可做查詢或討論。JUC是塊硬骨頭,你要說難,那倒未必多難,你要說簡單?我想,能說明白的人恐怕也不會(huì)多,今天,我們就來啃一啃這塊難啃的骨頭,挖一挖JUC的祖墳,看看有沒有意外收獲。
線程基礎(chǔ)
進(jìn)程與線程
進(jìn)程
從JVM的角度來理解進(jìn)程我覺得會(huì)稍微簡單一點(diǎn),JVM是虛擬機(jī),算得上是進(jìn)程的載體,也就是操作系統(tǒng)了,那么所有的操作系統(tǒng)iOS/Android/Windows/macOS/Linux,其所運(yùn)行的設(shè)備,比如手機(jī),電腦都算得上是一個(gè)進(jìn)程的載體設(shè)備。往細(xì)了說,每一個(gè)單獨(dú)運(yùn)行的實(shí)例就是一個(gè)進(jìn)程,實(shí)例即應(yīng)用,所以一個(gè)單獨(dú)運(yùn)行的應(yīng)用程序,不管手機(jī)也好,電腦也罷,都是一個(gè)單獨(dú)的進(jìn)程。
用一句老話說:進(jìn)程是資源分配和管理的最小單位。所以進(jìn)程必定有自己的內(nèi)存空間和執(zhí)行任務(wù)的線程。用一張圖表示如下:
大體上是這樣,適用于各種操作系統(tǒng),對于進(jìn)程其實(shí)了解即可,更詳細(xì)的博主也只能去查資料了,但畢竟已經(jīng)脫離Java在應(yīng)用上的范疇,有興趣的自己查。
線程
如果說進(jìn)程是資源分配和管理的最小單位,那么線程就是進(jìn)程中程序執(zhí)行的最小單元。
但是線程并不是越多越好,因?yàn)檎嬲亩嗑€程依賴于操作系統(tǒng)多核的特性,如果只有一核,你就是說破天他也只能是單線程。之所以你覺得線程是在并行,是因?yàn)榫€程在單核CPU上快速的上下文切換。
至于什么是上下文切換,其實(shí)很好理解,就是多個(gè)線程排排坐,一人份配一小段時(shí)間片,輪流執(zhí)行,這個(gè)時(shí)間片非常短,在肉眼上造成了同時(shí)執(zhí)行的假象,讓人認(rèn)為是并行的。線程開始執(zhí)行叫加載,停止叫做掛起。而這種分配時(shí)間片工作的術(shù)語叫:CPU時(shí)間片輪轉(zhuǎn)機(jī)制。
時(shí)間片太短會(huì)頻繁切換線程,而掛起和加載都是要消耗CPU資源的,太長就會(huì)對有些比較短的交互請求不友好,人家完成工作了還需要等待時(shí)間片結(jié)束才能切換下一個(gè)線程。所以時(shí)間片設(shè)置比較講究,通常來說100ms左右是個(gè)比較友好的時(shí)間。
并行與并發(fā)
剛剛提到了并行,提到了多線程,那么我們就有必要來掰扯下多線程并行與并發(fā)的區(qū)別。
先說并發(fā),博主可以很負(fù)責(zé)任的告訴大家,并發(fā)是多個(gè)任務(wù)一起開始執(zhí)行,也不是只有一個(gè)線程在工作,而是同一時(shí)間正在執(zhí)行的線程只有一個(gè),通過頻繁的上下文切換來達(dá)到讓人以為一起執(zhí)行的效果,但實(shí)際同一時(shí)空下,只有一個(gè)線程在工作。
再說并行,并行是真正的多線程執(zhí)行,多個(gè)任務(wù)由多個(gè)線程共同執(zhí)行,但并行并不意味著每個(gè)任務(wù)都有一個(gè)線程來執(zhí)行,這要看線程的數(shù)量,雖然多線程有利于提高程序執(zhí)行效率,但切記,線程并不是越多越好,這就像吃飯,吃多了也撐得慌。
常見方法
run()&start()?
這兩個(gè)方法是用來啟動(dòng)線程的,但卻有著本質(zhì)的區(qū)別,線程在創(chuàng)建好之后要啟動(dòng),必須用start(),它是真正意義上的啟動(dòng)線程,它會(huì)讓線程進(jìn)入就緒狀態(tài)開始等待時(shí)間片分配,然后等到時(shí)間片分到后,才會(huì)調(diào)用run來開始工作。
需要注意的是,線程一旦啟動(dòng),start方法就不能再次調(diào)用,一個(gè)線程只能被啟動(dòng)一次,這個(gè)需要看下start的源碼:
//start源碼分析
public synchronized void start() {
/**
Java里面創(chuàng)建線程之后,必須要調(diào)用start方法才能創(chuàng)建一個(gè)線程,
該方法會(huì)通過虛擬機(jī)啟動(dòng)一個(gè)本地線程,本地線程的創(chuàng)建會(huì)調(diào)用當(dāng)前
系統(tǒng)去創(chuàng)建線程的方法進(jìn)行創(chuàng)建線程。
最終會(huì)調(diào)用run()將線程真正執(zhí)行起來
0這個(gè)狀態(tài),等于‘New’這個(gè)狀態(tài)。
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* 線程會(huì)加入到線程分組,然后執(zhí)行start0() */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
執(zhí)行的流程大致如下:
從run方法的使用來說,它就像一個(gè)成員方法,但又不全是,一般我們會(huì)重寫此方法,并在里面我們會(huì)執(zhí)行一些業(yè)務(wù)代碼。雖然它可以單獨(dú)執(zhí)行,也能重復(fù)執(zhí)行,但它不會(huì)啟動(dòng)線程這個(gè)事情的本質(zhì)不會(huì)改變。
suspend&()resume()&stop()
這三個(gè)方法對應(yīng)的是暫停、恢復(fù)和中止,我們以幾個(gè)事例來說明下:
public class Thr {
private static class MyThread implements Runnable{
@Override
public void run() {
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
while (true){
System.out.println(Thread.currentThread().getName()+"run at"+dateFormat.format(new Date()));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new MyThread());
//開啟線程
System.out.println("開啟線程");
thread.start();
TimeUnit.SECONDS.sleep(3);
//暫停線程
System.out.println("暫停線程");
thread.suspend();
TimeUnit.SECONDS.sleep(3);
//恢復(fù)線程
System.out.println("恢復(fù)線程");
thread.resume();
TimeUnit.SECONDS.sleep(3);
//中止線程
System.out.println("中止線程");
thread.stop();
}
}
其執(zhí)行的結(jié)果如下:
開啟線程 my threadrun at22:42:24 my threadrun at22:42:25 my threadrun at22:42:26 暫停線程 恢復(fù)線程 my threadrun at22:42:30 my threadrun at22:42:31 my threadrun at22:42:32 中止線程?
雖然這三個(gè)方法使用我們說了,但還是要說一句,這三個(gè)方法已經(jīng)被Java標(biāo)注為過期,雖然還沒刪除,但也不建議繼續(xù)使用了,?因?yàn)榫€程在暫停的時(shí)候仍然占用著鎖,很容易導(dǎo)致死鎖,我們通過一段代碼來看看原因:
public class Thr {
private static Object obj = new Object();//作為一個(gè)鎖
private static class MyThread implements Runnable{
@Override
public void run() {
synchronized (obj){
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
while (true){
System.out.println(Thread.currentThread().getName()+"run at"+dateFormat.format(new Date()));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new MyThread(),"正常線程");
Thread thread1 = new Thread(new MyThread(),"死鎖線程");
//開啟線程
thread.start();
TimeUnit.SECONDS.sleep(3);
//暫停線程
thread.suspend();
System.out.println("暫停線程");
thread1.start();
TimeUnit.SECONDS.sleep(3);
}
}
我們來猜猜輸出結(jié)果會(huì)是什么樣呢?來看看:
正常線程run at14:30:42 正常線程run at14:30:43 正常線程run at14:30:44 暫停線程
3s內(nèi)執(zhí)行了三次,在暫停之后,啟動(dòng)thread1后,沒有任何輸出,因?yàn)殒i沒有釋放,thread1無法獲取到鎖,也就無法執(zhí)行run里面的任務(wù),導(dǎo)致死鎖出現(xiàn)。
接著來說說stop,stop會(huì)立即停止run中剩余的操作,會(huì)導(dǎo)致一些收尾工作無法完成,特別是涉及到文件流或數(shù)據(jù)庫關(guān)閉的時(shí)候,試想如果實(shí)在做數(shù)據(jù)同步,突然stop一定會(huì)導(dǎo)致最終數(shù)據(jù)不一致,而最終一致性一詞出現(xiàn)在分布式事務(wù)中。這里就不代碼演示了,只說下解決方法,如果需要平滑一點(diǎn),有好一點(diǎn)的通知線程馬上要中斷了,可以用interrupt(),比如:
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new MyThread(),"myThread");
thread.start();
TimeUnit.SECONDS.sleep(3);
thread.interrupt();
}
?方法上多了一個(gè)打斷的異常,需要的時(shí)候可以利用起來。判斷是否被打斷可以通過isInterrupted(),true為收到中斷消息,false則表示沒收到中斷消息。推薦另一種靜態(tài)方法判斷線程中斷的方式:
Thread.interrupted()
使用更簡潔,不依賴于線程實(shí)例化出來的對象。如果該線程已經(jīng)被添加了中斷標(biāo)識(shí),當(dāng)使用了該方法后,會(huì)將線程的中斷標(biāo)識(shí)由true改為false,可謂是弄巧成拙,另外還需注意,此方法對死鎖下線程無效。?
wait()¬ify()
wait()、notify()、notifyAll()嚴(yán)格意義上來說不能算是線程的方法,他們是定義在Object中的方法,只是可以用來控制線程,但它在使用時(shí)要遵守一定的規(guī)則:必須在線程同步過程中使用,而且必須是使用的同一個(gè)鎖的情況下。我們以一個(gè)簡單的案例來說明:
public class WaitNotify {
static boolean flag = false;
static Object lock = new Object();
//創(chuàng)建等待線程
static class WaitThread implements Runnable{
@Override
public void run() {
synchronized (lock){
while (!flag){
//條件不滿足,進(jìn)入等待
System.out.println(Thread.currentThread().getName()+" flag is false,waiting");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//條件滿足,退出等待
System.out.println(Thread.currentThread().getName()+" flag is true");
}
}
}
static class NotifyThread implements Runnable{
@Override
public void run() {
synchronized (lock){
System.out.println(Thread.currentThread().getName()+" hold lock");
lock.notify();
flag=true;
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(new WaitThread(),"wait").start();
TimeUnit.SECONDS.sleep(1);
new Thread(new NotifyThread(),"notify").start();
}
}
查看下輸出就可以知道它的運(yùn)行軌跡:
wait flag is false,waiting notify hold lock wait flag is true?
我們來理一理這個(gè)流程:
- WaitThread先執(zhí)行,并獲取鎖;
- WaitThread在鎖內(nèi)調(diào)用wait方法,代表著放棄鎖,進(jìn)入等待隊(duì)列,狀態(tài)為等待狀態(tài);
- 1s后,NotifyThread執(zhí)行,此時(shí)鎖沒有被持有,那么NotifyThread可以持有鎖,獲取鎖對象;
- NotifyThread在獲取到鎖對象后調(diào)用了notify()或者notifyAll()這倆都行,將WaitThread從等待隊(duì)列中移除,或者叫出來了,放在一個(gè)同步隊(duì)列中,其實(shí)還是WaitThread原本所處的隊(duì)列,也有可能是主線程所在隊(duì)列,但由于NotifyThread此時(shí)仍沒有釋放鎖,所以WaitThread還是阻塞狀態(tài),但馬上就要開始工作了;
- NotifyThread改變了條件,發(fā)了通知,釋放了鎖,WaitThread重新獲取鎖,通過while判斷條件flag為true,跳出等待,執(zhí)行了接下來的輸出。
聽著都覺得精彩,就是不知道你有沒有聽懂啊。聽不懂沒關(guān)系,多看幾遍就懂了。
wait()&sleep()
我們在上面的方法中看到過很多sleep方法,你即使不明其意,但單從英文上也看得出兩者從在的本質(zhì)差別:等待和睡覺。這兩者還是有很大差別的。
從所屬關(guān)系來說,wait方法術(shù)語Object,sleep方法屬于Thread專屬。
從鎖的角度來說,sleep時(shí),我們可以認(rèn)為一切只是暫停了,等到暫停時(shí)間結(jié)束,任務(wù)還是要接著執(zhí)行的,所以sleep時(shí)線程依舊占有鎖,而我們通過wait¬ify可知,wait時(shí)鎖被釋放了。
從執(zhí)行的角度來說,sleep后,只要等待指定的時(shí)間,線程仍可以正常運(yùn)行,它讓出的只是CPU的調(diào)度權(quán),用梁靜茹的話說:還沒有完全交出自己。它的監(jiān)控狀態(tài)依然還保持著,只等時(shí)間一到,就繼續(xù)干活。而wait執(zhí)行后,則是完全交出了自己并處于等待狀態(tài),沒有同一個(gè)對象調(diào)用notify方法,休想繼續(xù)接下來的工作,果然等待才是最沒安全感的啊。
yield()
yield有點(diǎn)神奇,就像是賽跑的時(shí)候,執(zhí)行此方法后代表搶跑了,你就要被叫回來,然后重新開始準(zhǔn)備開跑,但是總有人會(huì)接著搶跑,包括你自己在內(nèi)。執(zhí)行yield的線程會(huì)讓出自己持有的時(shí)間片,讓CPU重新選擇線程執(zhí)行,雖然你讓出了時(shí)間片,但仍有機(jī)會(huì)再下次選擇執(zhí)行線程的時(shí)候被選中,這個(gè)過程是隨機(jī)的。
public class YieldDemo {
static class MyThread implements Runnable{
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+" : "+i);
if (i == 2){
System.out.println(Thread.currentThread().getName());
Thread.yield();
}
}
}
}
public static void main(String[] args) {
new Thread(new MyThread()).start();
new Thread(new MyThread()).start();
new Thread(new MyThread()).start();
}
}
這是一個(gè)大家常舉的例子,可以看看,最終的輸出結(jié)果如下:
Thread-1 : 0 Thread-0 : 0 Thread-1 : 1 Thread-0 : 1 Thread-0 : 2 Thread-0 Thread-1 : 2 Thread-1 Thread-2 : 0 Thread-2 : 1 Thread-2 : 2 Thread-2 Thread-0 : 3 Thread-0 : 4 Thread-1 : 3 Thread-1 : 4 Thread-2 : 3 Thread-2 : 4?
每次執(zhí)行結(jié)果都是隨機(jī)的,可以自己試試看。?
join()
join在實(shí)際場景很少使用,目的是保證線程的執(zhí)行順序,讓每一個(gè)線程都持有前一個(gè)線程的引用,實(shí)際中,我們似乎不需要讓線程按照某種我們設(shè)定好的順序來執(zhí)行,主要也看業(yè)務(wù)場景吧。
舉個(gè)例子:
public class JoinDemo {
private static class MyThread extends Thread{
int i;
Thread previousThread; //上一個(gè)線程
public MyThread(Thread previousThread,int i){
this.previousThread=previousThread;
this.i=i;
}
@Override
public void run() {
//調(diào)用上一個(gè)線程的join方法. 不使用join方法解決是不確定的
try {
previousThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("num:"+i);
}
}
public static void main(String[] args) {
Thread previousThread=Thread.currentThread();
for(int i=0;i<10;i++){
//每一個(gè)線程實(shí)現(xiàn)都持有前一個(gè)線程的引用。
MyThread joinDemo=new MyThread(previousThread,i);
joinDemo.start();
previousThread=joinDemo;
}
}
}
在沒有添加join的情況下,線程的執(zhí)行順序必然是隨機(jī)的,而在添加了join后,線程會(huì)依次等待上一個(gè)線程執(zhí)行完成之后收到通知才會(huì)執(zhí)行當(dāng)前線程,可以自己運(yùn)行下代碼,看看join注釋前后代碼的執(zhí)行結(jié)果。
我們把源碼復(fù)制出來:
/**
* Waits for this thread to die.
*
* <p> An invocation of this method behaves in exactly the same
* way as the invocation
*
* <blockquote>
* {@linkplain #join(long) join}{@code (0)}
* </blockquote>
*
* @throws InterruptedException
* if any thread has interrupted the current thread. The
* <i>interrupted status</i> of the current thread is
* cleared when this exception is thrown.
*/
public final void join() throws InterruptedException {
join(0);
}
/**
* Waits at most {@code millis} milliseconds for this thread to
* die. A timeout of {@code 0} means to wait forever.
*
* <p> This implementation uses a loop of {@code this.wait} calls
* conditioned on {@code this.isAlive}. As a thread terminates the
* {@code this.notifyAll} method is invoked. It is recommended that
* applications not use {@code wait}, {@code notify}, or
* {@code notifyAll} on {@code Thread} instances.
*
* @param millis
* the time to wait in milliseconds
*
* @throws IllegalArgumentException
* if the value of {@code millis} is negative
*
* @throws InterruptedException
* if any thread has interrupted the current thread. The
* <i>interrupted status</i> of the current thread is
* cleared when this exception is thrown.
*/
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
//判斷是否攜帶阻塞的超時(shí)時(shí)間,等于0表示沒有設(shè)置超時(shí)時(shí)間
if (millis == 0) {
//isAlive獲取線程狀態(tài),無限等待直到previousThread線程結(jié)束
while (isAlive()) {
//調(diào)用Object中的wait方法實(shí)現(xiàn)線程的阻塞
wait(0);
}
} else { //阻塞直到超時(shí)
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
該方法被synchronized修飾,內(nèi)部的阻塞通過wait方法實(shí)現(xiàn),記住一點(diǎn),使用wait方法,必須添加synchronized關(guān)鍵字。
優(yōu)先級(jí)
線程的優(yōu)先級(jí)決定了被執(zhí)行的順序,優(yōu)先級(jí)越高,執(zhí)行的順序就越靠前。通過setPriority()設(shè)置優(yōu)先級(jí),默認(rèn)5,范圍為1-10,話雖這么說,但優(yōu)先級(jí)并非萬能,因?yàn)橛行┫到y(tǒng)可能會(huì)無視優(yōu)先級(jí)的存在,而且優(yōu)先級(jí)高的線程也會(huì)無理的搶占資源導(dǎo)致優(yōu)先級(jí)低的線程遲遲不能執(zhí)行,還是慎用吧。
守護(hù)線程
一提到守護(hù)線程,博主腦海中立刻就跳出來了分布式鎖方案中使用Redis方案下的鎖過期問題,它就是通過守護(hù)線程定期給未釋放的鎖添加過期時(shí)間來解決鎖過期的問題的。
守護(hù)線程是一種支持型的線程,相比而言,我們前面創(chuàng)建的線程都叫做用戶線程,守護(hù)線程創(chuàng)建出來的目的就是為了守護(hù)用戶線程,被守護(hù)的用戶線程結(jié)束,守護(hù)線程也會(huì)結(jié)束,兩者是伴生的。關(guān)于守護(hù)線程,博主不會(huì)講的太詳細(xì),了解下就可以了,如需使用,可自行學(xué)習(xí),好吧,其實(shí)也不難用。加個(gè)線程設(shè)置為守護(hù)線程,然后在run方法中執(zhí)行你想讓它做的事情就可以了,如果需要設(shè)置某時(shí)間做什么事,可在while中通過sleep來實(shí)現(xiàn),可以嘗試自己寫下,博主就不寫了。
狀態(tài)總結(jié)
關(guān)于線程的狀態(tài),我們用一張圖來說明:
看看這些線程執(zhí)行的過程調(diào)用不同的方法是不是進(jìn)入了對應(yīng)的狀態(tài)。 說實(shí)話,寫到這里博主已經(jīng)快要沒有耐心了,而這里也只不過是寫了一小部分而已,路漫漫其修遠(yuǎn)兮,吾將上下而求索。換下個(gè)話題。
內(nèi)置鎖synchronized
關(guān)于synchronized關(guān)鍵字我想大家都不會(huì)陌生,它的出場率還算是蠻高的,但你可能不知道他其實(shí)是線程的內(nèi)置鎖,有內(nèi)置,當(dāng)然也有外置鎖,Java中叫顯示鎖,名為Lock的類。不過不急,我們都會(huì)一一講解到的,下面還是專心來看synchronized的使用。
關(guān)于線程中為什么要用鎖這個(gè)問題,我覺得有必要回答一下,否則鎖的存在毫無意義,因?yàn)槎鄠€(gè)線程相互之間單獨(dú)執(zhí)行是沒有意義的,所以我們需要線程之間互相是能夠協(xié)調(diào)工作的,所以鎖的出現(xiàn)是為了保護(hù)那些被線程訪問的資源的安全性。這個(gè)問題在分布式事務(wù)中尤為重要,這決定我們的數(shù)據(jù)庫數(shù)據(jù)的ACID/CAP,甚至于BASE理論,CP/AP模式等是否成立。不過不用急,這些知識(shí)一個(gè)都跑不了。
基本使用
添加在方法上
//添加在方法上
public synchronized void count(){
count++;
}
添加在方法內(nèi)
//添加同步代碼塊
//需要聲明一個(gè)鎖對象
//作為鎖對象
private Object object = new Object();
public void count(){
synchronized (object){
count++;
}
}
總結(jié)
兩種方式都保證了數(shù)據(jù)的安全性,在多線程下,只有等到獲取到鎖的線程將鎖釋放掉后,下一個(gè)線程才能持有鎖。synchronized就是通過這種機(jī)制來保證數(shù)據(jù)的一致性的。
這是一種對象鎖,要求我們鎖定的必須是同一個(gè)對象,否則將無法生效。
注意:你一定好奇,能加在方法上,方法內(nèi),難道不能加在類上?額~這還真可以,為什么不說呢?那是因?yàn)椴┲鞑幌M銓W(xué)會(huì)往類上加鎖這種方式,類鎖的鎖定范圍太大了,我們在使用的鎖的時(shí)候要堅(jiān)持范圍越小性能越好的原則,不要盲目加鎖。
實(shí)現(xiàn)原理
原理解析
鎖的獲取和釋放我們都已經(jīng)了解了,但它的內(nèi)部究竟是怎么實(shí)現(xiàn)的呢?首先我們創(chuàng)建這樣一個(gè)類:
package com.codingfire.cache;
public class Thr {
public void test2() {
synchronized (this) {
}
}
}
然后通過javac命令編譯生成class文件,接著用javap命令反編譯,得到的信息如下:
從上述內(nèi)容可以看出,同步代碼塊是使用monitorenter和monitorexit指令實(shí)現(xiàn)的。而同步方法的實(shí)現(xiàn)方式這里無法得知,在JVM規(guī)范里也沒有詳細(xì)說明。但有一點(diǎn)我們可以確定,同步方法可以使用這兩個(gè)指令來實(shí)現(xiàn),有這,就夠了。
但是我們還需要去思考一個(gè)問題,synchronized的鎖在哪?經(jīng)過查閱一些資料,我們知道,synchronized的鎖在Java頭中,奶奶的,Java頭又是啥?Java對象在內(nèi)存中由三部分組成:
- Java頭
- MarkWord
- 類型指針
- 數(shù)組長度(只有數(shù)組對象有)
- 實(shí)例數(shù)據(jù)
- 對齊填充字節(jié)
需要說明的是,這是HotSpot JVM下的對象組成,為什么說這個(gè)呢,這里牽涉到一個(gè)jdk7到j(luò)dk8時(shí)jvm中永久帶被取消的原因,其中最重要一點(diǎn)就是HotSpot JVM和JRockit JVM融合,而JRockit沒有永久帶。我只說一遍,記住了哈。
Mark Word用于存儲(chǔ)對象自身的運(yùn)行時(shí)數(shù)據(jù),如哈希碼(HashCode)、GC分代年齡、鎖狀態(tài)標(biāo)志、線程持有的鎖等。到此為止了,后面的就不再說了,說了也記不住,而且沒有實(shí)際意義,徒增煩惱啊。
Synchronized鎖優(yōu)化
你可能沒想到,Synchronized還有被優(yōu)化的時(shí)候,這是源自Java1.6之后,為了減少鎖獲取和釋放帶來的性能問題而做的,主要是引入了偏向鎖和輕量級(jí)鎖的概念,讓鎖的等級(jí)發(fā)生了變化,分別是:無鎖,偏向鎖,輕量級(jí)鎖,重量級(jí)鎖。狀態(tài)間的轉(zhuǎn)換會(huì)隨著競爭情況逐漸升級(jí),鎖可以升級(jí)但不能降級(jí),但偏向鎖降級(jí)為無鎖的情況不包括在內(nèi),這種情況稱之為鎖撤銷,而升級(jí)成為鎖膨脹。
自旋鎖
在鎖的轉(zhuǎn)化過程中,誕生了一種新的鎖:自選鎖,說是鎖,用一種狀態(tài)來說可能更好一些。
自選鎖從阻塞到喚醒需要CPU從用戶態(tài)轉(zhuǎn)換為內(nèi)核態(tài),這個(gè)和iOS中runtime的環(huán)形過程一樣,不過自選鎖可不會(huì)像iOS的運(yùn)行時(shí)那樣會(huì)韜光養(yǎng)晦,在不需要的時(shí)候就沉睡,等待被再次喚醒。自選鎖的出現(xiàn)是因?yàn)殒i狀態(tài)持續(xù)的時(shí)間很短,從線程阻塞到喚醒的切換過程要耗費(fèi)太多的性能,所以才通過自旋來等待鎖的釋放。所謂自旋,就是執(zhí)行一段毫無意義的循環(huán)即可。
注意:自旋不能代替阻塞,不要覺得自旋就沒有性能消耗,自旋的過程消耗的是處理器的性能,如果自旋等待的鎖很快釋放,那樣自旋的效率就很高,反之就很低。
自旋鎖在JDK1.4中引入,默認(rèn)關(guān)閉,但是可以使用-XX:+UseSpinning開啟,在JDK1.6中默認(rèn)開啟。同時(shí)自旋的默認(rèn)次數(shù)為10次,可以通過參數(shù)-XX:PreBlockSpin來調(diào)整。
但有時(shí)候,這樣的次數(shù)很是差強(qiáng)人意,所以就誕生了自適應(yīng)的自旋鎖,它的次數(shù)不再固定,而是由前一次在同一個(gè)鎖上的自旋時(shí)間及鎖的擁有者的狀態(tài)來決定。如果上一次在同一個(gè)鎖上的自旋成功了,那么下次就會(huì)適當(dāng)增加自旋的次數(shù),如果失敗了,就會(huì)適當(dāng)減少自旋的次數(shù)。
鎖撤銷
在鎖升級(jí)的過程中,一般是不會(huì)降級(jí)的,但有一種特殊情況會(huì)出現(xiàn)鎖撤銷,這種情況發(fā)生在編譯期,在此期間,JVM監(jiān)測到不需要發(fā)生競爭的鎖,機(jī)會(huì)通過鎖消除來降低毫無意義的請求鎖的時(shí)間。
比如說下面這個(gè)方法:
public static String getString(String s1, String s2) {
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
return sb.toString();
}
StringBuffer是作為方法內(nèi)部的局部變量,每次都初始化,因此它不可能被多個(gè)線程同時(shí)訪問,也就沒有資源競爭,但是StringBuffer的append操作卻需要執(zhí)行同步操作,我們看下append源碼:
@Override
public synchronized StringBuffer append(String str) {
toStringCache = null;
super.append(str);
return this;
}
此方法加了synchronized關(guān)鍵字,代表是一個(gè)同步方法,而這完全沒有必要,這就浪費(fèi)了系統(tǒng)資源?,因此在編譯時(shí)一旦JVM發(fā)現(xiàn)此種情況就會(huì)通過鎖消除方式來優(yōu)化性能。在JDK1.8中鎖消除是自動(dòng)開啟的。
這個(gè)東西大家只要知道一下就行,實(shí)際也并不需要我們額外做什么,擴(kuò)充下知識(shí)量就行。
鎖升級(jí)
偏向鎖
偏向鎖加鎖步驟:
- 線程初次執(zhí)行synchronized塊的時(shí)候,通過自旋的方式修改MarkWord鎖標(biāo)記,此時(shí),鎖對象為偏向鎖,這是從無鎖到偏向鎖的一個(gè)轉(zhuǎn)化過程;
- 代碼塊執(zhí)行完畢,鎖不會(huì)釋放,這是因?yàn)槠蜴i的偏向特性,一旦下次來獲取鎖的線程仍然是同一個(gè)線程,那么Java頭中的MarkWord信息就不需要修改了,也就達(dá)成了偏向的目的,而我們這里目的就是要讓它達(dá)成偏向,因?yàn)槠蜴i不需要替換MarkWord的鎖標(biāo)記,執(zhí)行效率是非常高的;
- 所以第二次執(zhí)行同步塊,首先會(huì)判斷MarkWord中的線程ID是否為當(dāng)前線程,如果是,由于之前沒有釋放鎖,這里也就不需要重新加鎖。如果自始至終使用鎖的線程只有一個(gè),很明顯偏向鎖幾乎沒有額外開銷,性能極高,效率會(huì)很高,就印證了上一條;
- 但也有可能,第二次發(fā)現(xiàn)MarkWord中線程ID和當(dāng)前線程不一樣,會(huì)啟動(dòng)CAS操作替換MarkWork中的線程ID。如果替換成功,獲取鎖,執(zhí)行同步代碼塊;如果替換失敗,執(zhí)行下一步;
- 替換失敗,表示有競爭,有另一個(gè)線程搶先獲取了偏向鎖,當(dāng)?shù)竭_(dá)全局安全點(diǎn)(safepoint)時(shí)獲得偏向鎖的線程被掛起,偏向鎖升級(jí)為輕量級(jí)鎖,被阻塞在安全點(diǎn)的線程繼續(xù)往下執(zhí)行同步塊中的代碼,此時(shí)鎖已經(jīng)變?yōu)檩p量級(jí)鎖。
關(guān)于撤銷,博主有幾句話要說,偏向鎖不會(huì)主動(dòng)釋放,只有在替換失敗的情況下,持有偏向鎖的對象才會(huì)在升級(jí)的時(shí)候釋放偏向鎖,偏向鎖的撤銷需要等待全局安全點(diǎn),這個(gè)時(shí)間點(diǎn)上沒有任何的字節(jié)碼執(zhí)行,它先暫停擁有偏向鎖的線程,判斷鎖對象是否處于被鎖定狀態(tài),如果對象處于未鎖定狀態(tài),說明同步塊中代碼已經(jīng)執(zhí)行完畢,相當(dāng)于原有的偏向鎖已經(jīng)過期無效了,此時(shí)該對象就應(yīng)該被直接轉(zhuǎn)換為不可偏向的無鎖狀態(tài),標(biāo)志位為“01”,否則鎖存在,該鎖上升為輕量級(jí)鎖(標(biāo)志位為“00”)的狀態(tài)。
這比較適合始終只有一個(gè)線程在執(zhí)行同步代碼塊,綜上可知,此時(shí)的效率才是最高的。
輕量級(jí)鎖
輕量級(jí)鎖由偏向鎖升級(jí)而來,輕量級(jí)鎖的升級(jí)我覺得很多地方總結(jié)的都不好,理解起來其實(shí)很簡單:就是當(dāng)一個(gè)線程在自旋等待鎖的過程中,又來了一個(gè)線程獲取鎖,那么鎖就會(huì)升級(jí)為重量級(jí)鎖。
這知識(shí)最簡單的說明,但博主覺得簡單粗暴的很,雖然不夠細(xì)節(jié),但重在描述簡潔,了解即可。
解鎖和輕量級(jí)鎖相似,如果下次進(jìn)來的線程鎖記錄和對象頭中的MarkWord能匹配的上,說明是同一個(gè)線程,則執(zhí)行代碼塊,鎖不會(huì)升級(jí),如果不匹配,則線程被掛起,輕量級(jí)鎖升級(jí)為重量級(jí)鎖,輕量級(jí)的鎖被釋放,接著喚醒被掛起的線程,重新爭奪同步塊的鎖。
注意:因?yàn)樽孕僮魇且蕾囉贑PU的,為了避免無用的自旋操作(獲得鎖的線程被阻塞住了),輕量級(jí)鎖一旦升級(jí)為重量級(jí)鎖,就不會(huì)再恢復(fù)到輕量級(jí)鎖。當(dāng)鎖處于重量級(jí)鎖時(shí),其他線程試圖獲取鎖時(shí),都會(huì)被阻塞住。當(dāng)持有鎖的線程釋放鎖之后會(huì)喚醒其他線程再次開始競爭鎖。
重量級(jí)鎖
重量級(jí)鎖通過對象內(nèi)部的監(jiān)視器(monitor)實(shí)現(xiàn),其中monitor的本質(zhì)是依賴于底層操作系統(tǒng)的Mutex Lock實(shí)現(xiàn),操作系統(tǒng)實(shí)現(xiàn)線程之間的切換需要從用戶態(tài)到內(nèi)核態(tài)的切換,切換成本非常高。
?切換成本高的原因在于,當(dāng)系統(tǒng)檢查到是重量級(jí)鎖之后,會(huì)把等待想要獲取鎖的線程阻塞,被阻塞的線程不會(huì)消耗CPU,但是阻塞或者喚醒一個(gè)線程,都需要通過操作系統(tǒng)來實(shí)現(xiàn),也就是相當(dāng)于從用戶態(tài)轉(zhuǎn)化到內(nèi)核態(tài),而轉(zhuǎn)化狀態(tài)是需要消耗時(shí)間的 。
?概括下來就是:競爭失敗后,線程阻塞,釋放鎖后,喚醒阻塞的線程,為了CPU性能,不再使用自旋鎖,將等待同步代碼執(zhí)行完畢后喚醒阻塞的競爭線程展開競爭。所以重量級(jí)鎖適合用在同步塊執(zhí)行時(shí)間比較長的情況下。
總結(jié)
前面的描述總感覺不在重點(diǎn)上,雖然寫了很多,但不好理解。博主覺得下面的總結(jié)可能更容易讓大家看懂,不妨再看看吧:
鎖 | 優(yōu)點(diǎn) | 缺點(diǎn) | 使用場景 |
---|---|---|---|
偏向鎖 | 加鎖和解鎖會(huì)存在CAS,沒有額外的性能消耗,和執(zhí)行非同步方法相比,僅存在納秒級(jí)的差距 | 如果線程間存在鎖競爭,會(huì)帶來額外的鎖撤銷的消耗 | 只有一個(gè)線程訪問同步塊或者同步方法的場景 |
輕量級(jí)鎖 | 競爭的線程不會(huì)阻塞,通過自旋提高程序響應(yīng)速度 | 若線程長時(shí)間搶不到鎖,自旋會(huì)消耗CPU性能 | 追求響應(yīng)時(shí)間。同步代碼塊執(zhí)行非???/td> |
重量級(jí)鎖 | 線程競爭不使用自旋,不消耗CPU | 線程阻塞,響應(yīng)時(shí)間緩慢,在多線程下嗎,頻繁的獲取釋放鎖,會(huì)帶來巨大的性能消耗 | 追求吞吐量,同步塊或者同步方法執(zhí)行時(shí)間較長的場景 |
偏向鎖:在不存在多線程競爭情況下,默認(rèn)會(huì)開啟偏向鎖。
偏向鎖升級(jí)輕量級(jí)鎖:當(dāng)一個(gè)對象持有偏向鎖,一旦第二個(gè)線程訪問這個(gè)對象,如果產(chǎn)生競爭,偏向鎖升級(jí)為輕量級(jí)鎖。
輕量級(jí)鎖升級(jí)重量級(jí)鎖:一般兩個(gè)線程對于同一個(gè)鎖的操作都會(huì)錯(cuò)開,或者說稍微等待一下(自旋),另一個(gè)線程就會(huì)釋放鎖。但是當(dāng)自旋超過一定的次數(shù),或者一個(gè)線程在持有鎖,一個(gè)在自旋,又有第三個(gè)來訪時(shí),輕量級(jí)鎖膨脹為重量級(jí)鎖,重量級(jí)鎖使除了擁有鎖的線程以外的線程都阻塞,防止CPU空轉(zhuǎn)。
半路吐槽
臥槽,實(shí)在扛不住了,本想一次性寫完,但感覺沒個(gè)一星期以上寫不完的節(jié)奏,準(zhǔn)備分成幾篇一起發(fā)。整體上,掌握同步塊和一些工具類的使用就很OK了,很多文字描述都是晦澀的,真的不好懂,博主也不敢說能完全搞懂,但本著挖穿JUC祖墳的念頭,咱們深啃一下,爭取搞個(gè)七七八八,了卻一樁心事。但為了此系列不能毀在我第一篇博客手里,博主決定繼續(xù)寫吧,爭取5w字以內(nèi)寫完。
死鎖
你要是不知道什么是死鎖,那博主可就沒辦法了,關(guān)于什么是死鎖,怎么造成的,我本來想給你們省略的,想了想,還是簡單寫寫吧,你們可別嫌棄我寫的簡單?。?/p>
死鎖及其原因
一個(gè)線程申請資源時(shí),恰好沒有可用的資源,此時(shí)需要的資源正被其他線程占有,如果被占有的資源永遠(yuǎn)不會(huì)釋放,那么就會(huì)造成死鎖。
舉個(gè)例子:線程1使用A資源,線程2使用B資源,A資源中要使用B,B資源要使用A,此時(shí)他們分別被線程1和線程2持有,都沒執(zhí)行完就不會(huì)釋放,所以只能相互等待,造成死鎖。
造成死鎖的原因是因?yàn)楸舜硕疾豢戏艞壸约旱馁Y源,就像是鷸蚌相爭,最終都沒有好下場,哈哈哈哈?。?!
排查死鎖
排查死鎖的方式有很多種,說實(shí)話,博主也很少用,我相信大家都用的不會(huì)太多,這里我就不去亂教大家了,有需要自己查吧,咱們也別五十步笑百步,博主就提供下幾個(gè)工具和思路表示下歉意:
- 通過JDK工具jps+jstack
- 通過JDK工具jconsole
- 通過JDK工具VisualVM
避免死鎖
避免死鎖其實(shí)很簡單,細(xì)心一點(diǎn)就不會(huì)出問題,比如:
- 避免一個(gè)線程獲取多個(gè)鎖;
- 避免一個(gè)線程占用多個(gè)資源;
- 可以考慮使用一些工具類,因?yàn)榉奖?,還不用擔(dān)心性能問題,思索問題等,盡量不直接使用內(nèi)置鎖,這些工具類后面會(huì)講到。
CAS
其實(shí)前面在寫同步塊的時(shí)候就提到過CAS,當(dāng)時(shí)沒有介紹這是什么,現(xiàn)在大家可以來看看了。CAS(Compare and Swap),即比較并替換,是用于實(shí)現(xiàn)多線程同步的原子操作。
所謂原子操作,是不會(huì)被線程調(diào)度打斷的操作,只要開始,就會(huì)一直運(yùn)行到結(jié)束,中間不會(huì)有任何的切換或停頓。這不由使我想到了ACID,這是數(shù)據(jù)庫事物的特性,其中A就是Atomicity,代表事物一旦開始就不會(huì)中斷,要么全部成功,要么全部失敗。分布式事務(wù)后面也會(huì)挖墳,大家敬請期待。
實(shí)現(xiàn)原子性需要鎖,但是同步塊synchronized基于阻塞,在使用上算是粒度比較大的比較粗糙的,使用它的線程在獲取到鎖后,其他來訪問的線程需要等待,直到鎖釋放,這對于一些簡單的需求顯得過于笨重,還有一些譬如死鎖問題,競爭問題,等待問題不好解決。
說了這么多synchronized的壞話,帶我們也不能抹殺它的存在,它還是有用的。關(guān)于它的使用我們可以回過頭去上看再看看。
CAS原理
CAS是compare and swap的縮寫,從字面來看,比較并交換,淺顯一點(diǎn),這就是遠(yuǎn)離了,但肯定不會(huì)是單純的比較交換,否則也沒必要說了。
CAS操作過程都包含三個(gè)運(yùn)算符:內(nèi)部地址V、期望值A(chǔ)、新值B。這讓我想到一些系統(tǒng)方法里面就是這三個(gè)參數(shù),比如,哎呀,突然想不起來了,最近還用過的,似乎是MybatisPlus里用的,不糾結(jié)了,大家可以去翻我SSM系列的內(nèi)容看看。
使用這三個(gè)參數(shù)時(shí),當(dāng)操作的這個(gè)內(nèi)存地址V上存放的值等于期望值A(chǔ),則將內(nèi)存地址上的值修改為新值B,否則不做任何操作。常見的CAS循環(huán)其實(shí)就是在一個(gè)循環(huán)里不斷的做CAS操作,直到成功為止。有自旋那味兒了。
CAS對于線程安全層面是沒有顯示處理的,而是在CPU和內(nèi)存中完成的,充分利用了多核的特性,實(shí)現(xiàn)了硬件層面的阻塞,再加上volatile關(guān)鍵字就可以實(shí)現(xiàn)原子上的線程安全了。
悲觀鎖&樂觀鎖
說到CAS,不得不說的就是悲觀鎖和樂觀鎖。和人的性格一樣,這種同名鎖就像是悲觀的人和樂觀的人一樣,下面我們來看看他們具體悲觀在哪里,樂觀在哪里。
?悲觀鎖總是假設(shè)會(huì)出現(xiàn)最壞的情況,說好聽點(diǎn)叫未雨綢繆。每次去獲取數(shù)據(jù)時(shí),都會(huì)認(rèn)為其他線程會(huì)修改,總有刁民想害朕?所以每次在獲取數(shù)據(jù)時(shí)都會(huì)上鎖,當(dāng)其他線程去拿到這個(gè)數(shù)據(jù)就會(huì)阻塞,直到鎖被釋放。在關(guān)系型數(shù)據(jù)庫中就大量應(yīng)用了這種鎖機(jī)制,如行鎖、表鎖、讀鎖、寫鎖,他們都是在操作前先上鎖,剛剛學(xué)的synchronized就是一個(gè)最好的例子。
樂觀鎖總是假設(shè)一直都是最好的情況,有點(diǎn)僥幸心理,每次獲取時(shí)都認(rèn)為其他線程不會(huì)修改,所以不會(huì)上鎖,但是在更新時(shí)會(huì)判斷在此期間別人有沒有更新這個(gè)數(shù)據(jù),可以使用基于條件或者版本號(hào)的方式實(shí)現(xiàn)。樂觀鎖適用于讀多寫少的場景,可以提升系統(tǒng)吞吐量,而CAS就是一種樂觀鎖的實(shí)現(xiàn)。?
CAS的問題
任何東西都是把雙刃劍,有好處,自然就有弊端,它最大的三個(gè)問題:ABA、循環(huán)時(shí)間開銷大、只能保證一個(gè)共享變量的原子操作。
ABA
CAS原理我們已經(jīng)講過了,從圖中我們基本能明白它的步驟,這也符合樂觀鎖的使用方式。但是你有沒有想過一個(gè)問題,假如我第一次把A修改成了B,第二次再把B修改成了A,那么我到底修改了嗎?我可以負(fù)責(zé)任的告訴你,CAS認(rèn)為沒有修改,但實(shí)際已經(jīng)發(fā)生兩次修改了。
這就是著名的ABA問題,解決這個(gè)問題也很簡單,添加判斷條件,如版本號(hào)來解決。這在分布式事務(wù)中對數(shù)據(jù)庫的優(yōu)化上比較常見。
循環(huán)時(shí)間開銷大
上圖中我們也看到了,一旦比較失敗,會(huì)進(jìn)行無休止的自旋,如果自旋時(shí)間過長,會(huì)給CPU帶來巨大性能開銷。假如有高并發(fā)情況出現(xiàn),這種自旋將吃掉幾乎所有的CPU性能,電腦就卡死機(jī)了。
只能保證一個(gè)共享變量的原子操作
CAS工作原理圖中,我們看到內(nèi)存中存儲(chǔ)著舊值,但是也只能存儲(chǔ)一個(gè)值,這也是為什么它只能保證一個(gè)變量的原子操作,多個(gè)變量時(shí),無法全部保存就無法保證多變量時(shí)的原子操作。如果要使用多個(gè)變量的時(shí)候,可以通過存儲(chǔ)引用類型,也就是我們所謂的對象,把多個(gè)變量包裝進(jìn)去就可以解決這個(gè)問題。
原子操作類
原子操作提供了一些基于原子性操作的工具類,但說實(shí)話,實(shí)際應(yīng)用中, 恐怕他們出場機(jī)會(huì)微乎其微,除了增改需要保證原子性,刪除和查詢是不需要的,而我們大多數(shù)的請求都是基于查詢的,即使我們增改真的要保證原子性,也多的是辦法,這些東西博主還真沒怎么用過,不知大小伙伴們用過沒?下面,我們一起來看看都有什么吧。
基本類型
AtomicInteger、AtomicBoolean、AtomicLong,這三個(gè)類的方法基本相同,以AtomicInteger為例:
int addAndGet():以原子的方式將輸入的數(shù)字與AtomicInteger里的值相加,并返回結(jié)果:
private static AtomicInteger atomicInteger = new AtomicInteger(0);
public static Integer addAndGetDemo(int value){
return atomicInteger.addAndGet(value);
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Integer result = addAndGetDemo(i);
System.out.println(result);
}
}
boolean compareAndSet(int expect, int update):如果輸入的數(shù)值等于預(yù)期值,則以原子方式將該值設(shè)置為輸入的值:
public static boolean compareAndSetDemo(int expect,int update){
return atomicInteger.compareAndSet(expect, update);
}
/*System.out.println(compareAndSetDemo(1,1));*/
就不一一列出來了,真要是需要用到了,單獨(dú)去查吧,估計(jì)用的會(huì)很少。
數(shù)組
AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray幾個(gè)類中的方法幾乎一樣,只是操作的數(shù)據(jù)類型不同,以AtomicIntegerArray中的API為例:
?一些方法解釋如下:
//執(zhí)行加法,第一個(gè)參數(shù)為數(shù)組的下標(biāo),第二個(gè)參數(shù)為增加的數(shù)量,返回增加后的結(jié)果
int addAndGet(int i, int delta)
//對比修改,參1數(shù)組下標(biāo),參2原始值,參3修改目標(biāo)值,成功返回true否則false
boolean compareAndSet(int i, int expect, int update)
//參數(shù)為數(shù)組下標(biāo),將數(shù)組對應(yīng)數(shù)字減少1,返回減少后的數(shù)據(jù)
int decrementAndGet(int i)
// 參數(shù)為數(shù)組下標(biāo),將數(shù)組對應(yīng)數(shù)字增加1,返回增加后的數(shù)據(jù)
int incrementAndGet(int i)
//和addAndGet類似,區(qū)別是返回值是變化前的數(shù)據(jù)
int getAndAdd(int i, int delta)
//和decrementAndGet類似,區(qū)別是返回變化前的數(shù)據(jù)
int getAndDecrement(int i)
//和incrementAndGet類似,區(qū)別是返回變化前的數(shù)據(jù)
int getAndIncrement(int i)
// 將對應(yīng)下標(biāo)的數(shù)字設(shè)置為指定值,第一個(gè)參數(shù)數(shù)組下標(biāo),第二個(gè)參數(shù)為設(shè)置的值,返回是變化前的數(shù)據(jù)
getAndSet(int i, int newValue)
我們來寫個(gè)小案例:
public class AtomicIntegerArrayDemo {
static int[] value = new int[]{1,2,3};
static AtomicIntegerArray array = new AtomicIntegerArray(value);
public static void main(String[] args) {
System.out.println(array.getAndSet(2,6));
System.out.println(array.get(2));
System.out.println(value[2]);
}
}
輸出結(jié)果如下:
3 22 3
你會(huì)發(fā)現(xiàn)AtomicIntegerArray獲取的值與原傳入數(shù)組的值不同,這是因?yàn)閿?shù)組是通過構(gòu)造方法傳遞,然后AtomicIntegerArray會(huì)將當(dāng)前傳入數(shù)組復(fù)制一份,當(dāng)AtomicIntegerArray對內(nèi)部數(shù)組元素進(jìn)行修改時(shí),不會(huì)影響原數(shù)組,哇!好秀??!。
引用類型
引用類型就是我們所說的對象了,關(guān)于引用類型需要使用Atomic包中的三個(gè)類,分別為:AtomicReference(用于原子更新引用類型)、AtomicMarkableReference(用于原子更新帶有標(biāo)記位的引用類型)、AtomicStampedReference(用于原子更新帶有版本號(hào)的引用類型)。
public class AtomicReferenceTest {
static class User{
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public User(String name, int age) {
this.name = name;
this.age = age;
}
}
public static AtomicReference<User> atomicReference = new AtomicReference<>();
public static void main(String[] args) {
User u1 = new User("張三",18);
User u2 = new User("李四",19);
atomicReference.set(u1);
atomicReference.compareAndSet(u1,u2);
System.out.println(atomicReference.get().getName());
System.out.println(atomicReference.get().getAge());
}
}
AtomicMarkableReference可以用于解決CAS中的ABA的問題:
public static void main(String[] args) throws InterruptedException {
User u1 = new User("張三", 22);
User u2 = new User("李四", 33);
//只有true和false兩種狀態(tài)。相當(dāng)于未修改和已修改
//構(gòu)造函數(shù)出傳入初始化引用和初始化修改標(biāo)識(shí)
AtomicMarkableReference<User> amr = new AtomicMarkableReference<>(u1,false);
//在進(jìn)行比對時(shí),不僅比對對象,同時(shí)還會(huì)比對修改標(biāo)識(shí)
//第一個(gè)參數(shù)為期望值
//第二個(gè)參數(shù)為新值
//第三個(gè)參數(shù)為期望的mark值
//第四個(gè)參數(shù)為新的mark值
System.out.println(amr.compareAndSet(u1,u2,false,true));
System.out.println(amr.getReference().getName());
}
AtomicStampedReference會(huì)是基于版本號(hào)思想解決ABA問題的,因?yàn)槠鋬?nèi)部維護(hù)了一個(gè)Pair對象,Pair對象記錄了對象引用和時(shí)間戳信息,實(shí)際使用的時(shí)候,要保證時(shí)間戳唯一,如果時(shí)間戳重復(fù),還會(huì)出現(xiàn)ABA的問題。
我們來看看這個(gè)Pair:
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
/**
* Creates a new {@code AtomicStampedReference} with the given
* initial values.
*
* @param initialRef the initial reference
* @param initialStamp the initial stamp
*/
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
從這段代碼,我們看到Pair里面帶著的時(shí)間戳 ,根據(jù)時(shí)間戳做對比,比如ABA中,多次修改,值相同,時(shí)間戳肯定變了,所以比對后,肯定會(huì)發(fā)現(xiàn)被修改了,就能解決ABA問題。
我們以兩個(gè)線程為例寫個(gè)Demo:
package com.codingfire.cache;
import java.util.concurrent.atomic.AtomicStampedReference;
public class Thr {
private static final Integer INIT_NUM = 1000;
private static final Integer UPDATE_NUM = 100;
private static final Integer TEM_NUM = 200;
private static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(INIT_NUM, 1);
public static void main(String[] args) {
new Thread(() -> {
int value = (int) atomicStampedReference.getReference();
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + " : 當(dāng)前值為:" + value + " 版本號(hào)為:" + stamp);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(atomicStampedReference.compareAndSet(value, UPDATE_NUM, stamp, stamp + 1)){
System.out.println(Thread.currentThread().getName() + " : 當(dāng)前值為:" + atomicStampedReference.getReference() + " 版本號(hào)為:" + atomicStampedReference.getStamp());
}else{
System.out.println("版本號(hào)不同,更新失??!");
}
}, "線程A").start();
new Thread(() -> {
// 確保線程A先執(zhí)行
Thread.yield();
int value = (int) atomicStampedReference.getReference();
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + " : 當(dāng)前值為:" + value + " 版本號(hào)為:" + stamp);
System.out.println(Thread.currentThread().getName() +" : "+atomicStampedReference.compareAndSet(atomicStampedReference.getReference(), TEM_NUM, stamp, stamp + 1));
System.out.println(Thread.currentThread().getName() + " : 當(dāng)前值為:" + atomicStampedReference.getReference() + " 版本號(hào)為:" + atomicStampedReference.getStamp());
System.out.println(Thread.currentThread().getName() +" : "+atomicStampedReference.compareAndSet(atomicStampedReference.getReference(), INIT_NUM, stamp, stamp + 1));
System.out.println(Thread.currentThread().getName() + " : 當(dāng)前值為:" + atomicStampedReference.getReference() + " 版本號(hào)為:" + atomicStampedReference.getStamp());
}, "線程B").start();
}
}
這個(gè)Demo中初始值1000,線程A先執(zhí)行,輸出信息,線程B后執(zhí)行,yield就是為了保證線程A先執(zhí)行,線程A執(zhí)行到sleep后進(jìn)入休眠,線程B開始執(zhí)行compareAndSet操作,第一次修改成功,值200,第二次stamp值已經(jīng)變了,修改回原值1000,修改失敗了,第三次就是線程A中的判斷,為false,版本號(hào)變更,修改失敗。除非我們保持時(shí)間戳不變,否則是無法修改成功的,但是成功了,就會(huì)導(dǎo)致ABA問題,如果不考慮ABA,那也就沒有使用這種方式的必要了。
以上代碼可以自己輸出下看看是不是博主說的步驟。
更新操作
使用原子更新某字段時(shí),就要使用更新字段類,Atomic包下提供了3個(gè)類,AtomicIntegerFieldUpdater(原子更新整型字段)、AtomicLongFieldUpdater(原子更新長整型字段)、AtomicReferenceFieldUpdater(原子更新引用類型字段)
private static AtomicIntegerFieldUpdater<User> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(User.class,"age");
public static void main(String[] args) {
User user = new User("張三",18);
System.out.println(fieldUpdater.getAndIncrement(user));
System.out.println(fieldUpdater.get(user));
}
這個(gè)就很簡單了,用來更新就行了,用起來很神奇,getAndIncrement后,age+1,自己試試看。它的方法還有很多,大家可以自己去發(fā)現(xiàn)并嘗試。
版本差異
1.8之后,Java新增了幾個(gè)原子類:
- LongAdder:長整型原子類
- DoubleAdder:雙浮點(diǎn)型原子類
- LongAccumulator:類似LongAdder,但要更加靈活(要傳入一個(gè)函數(shù)式接口)
- DoubleAccumulator:類似DoubleAdder,但要更加靈活(要傳入一個(gè)函數(shù)式接口)
AtomicLong已經(jīng)通過CAS提供了非阻塞的原子性操作,相比使用阻塞算法的同步器來說性能已經(jīng)很好了,但LongAdder才是目前的天花板,LongAdder的特點(diǎn)在于多線程并發(fā)的情況下依然保持良好的性能,解決了AtomicLong的高并發(fā)瓶頸。
我們用一張圖來說明下:
也就是說,?LongAdder會(huì)把競爭的值分為多份,讓同樣多的線程去競爭多個(gè)資源那么性能問題就解決了,聽上去有點(diǎn)不可思議,但事實(shí)就是如此。其工作原理如下:
當(dāng)沒有出現(xiàn)多線程競爭的情況,線程會(huì)直接對初始value進(jìn)行修改,當(dāng)多線程的時(shí)候,那么LongAdder會(huì)初始化一個(gè)cell數(shù)組,然后對每個(gè)線程獲取對應(yīng)的hash值,之后通過hash & (size -1)[size為cell數(shù)組的長度]將每個(gè)線程定位到對應(yīng)的cell單元格,之后這個(gè)線程將值寫入對應(yīng)的cell單元格中的value,最后再將所有cell單元格的value和初始value進(jìn)行累加求和得到最終的值。并且每個(gè)線程競爭的Cell的下標(biāo)不是固定的,如果CAS失敗,會(huì)重新獲取新的下標(biāo)去更新,從而極大地減少了CAS失敗的概率。
顯示鎖
顯示鎖基礎(chǔ)
顯示鎖在說內(nèi)置鎖的時(shí)候就已經(jīng)提到過,顯示鎖就是明著加鎖和解鎖操作,這是因?yàn)閟ynchronized的使用方式比較固定,只能加在固定的地方,而我們需要根據(jù)業(yè)務(wù)自己控制的時(shí)候synchronized顯然不是那么的方便,所以就出現(xiàn)了按照程序猿主觀思想來加鎖的顯示鎖,顯示鎖中其提供了三個(gè)很常見方法:lock()、unLock()、tryLock()。
其標(biāo)準(zhǔn)用法如下:
//加鎖
lock.lock();
//業(yè)務(wù)邏輯
try{
i++;
}finally{
//解鎖
lock.unLock();
}
加鎖的過程不能寫在try中,否則一旦發(fā)生異常,鎖將被釋放。 最后在finally塊中釋放鎖,這是保證在獲取到鎖之后,最終鎖能夠被釋放。如果你了解seata里的TCC模式,那你一定知道有個(gè)叫事務(wù)懸掛和空回滾的東西,很相似。后期的博客我會(huì)介紹道。
那如何選擇使用synchronized還是Lock?呢?這要看具體的業(yè)務(wù)場景,如果不需要考慮鎖中斷取消的情況,synchronized無疑是更好的選擇,因?yàn)楝F(xiàn)在的JDK中對于synchronized的優(yōu)化是很多的,比如剛剛學(xué)過的鎖優(yōu)化升級(jí),如果想要更多的主動(dòng)權(quán),就選擇Lock。
ReentrantLock
Lock本身是一個(gè)接口,基于這個(gè)接口,可以實(shí)現(xiàn)各式各樣的鎖,ReentrantLock就是其中使用比較頻繁的一個(gè)鎖。
基本使用
public class LockTest extends Thread{
private static int count = 100000;
private static int value = 0;
private static Lock lock = new ReentrantLock();
@Override
public void run() {
for (int i = 0; i < count; i++) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+" : "+value);
value++;
}finally {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
LockTest l1 = new LockTest();
LockTest l2 = new LockTest();
l1.start();
l2.start();
TimeUnit.SECONDS.sleep(5);
System.out.println(value);
}
}
加鎖和解鎖的注意事項(xiàng)我們在上面已經(jīng)提到過,此處不再贅述。
可重入
可重入感覺就像是你穿了一件T桖后,還能再穿一件外套,你脫衣服的時(shí)候,也需要先脫外套才能脫T桖,穿衣服是加鎖,脫衣服是解鎖,這樣說,你能明白嗎?
ReentrantLock也會(huì)把它稱之為可重入鎖,這是一種遞歸無阻塞的同步機(jī)制。它可以等同于synchronized的使用,但是ReentrantLock提供了比synchronized更強(qiáng)大、靈活的鎖機(jī)制,可以減少死鎖發(fā)生的概率。
? 其內(nèi)部實(shí)現(xiàn)流程為:
- 每個(gè)鎖關(guān)聯(lián)一個(gè)線程持有者和計(jì)數(shù)器,當(dāng)計(jì)數(shù)器為0時(shí)表示該鎖沒有被任何線程持有,線程都會(huì)可獲得該鎖;
- 當(dāng)某個(gè)線程請求鎖成功后,JVM會(huì)記錄鎖的持有線程,并將計(jì)數(shù)器置為1,此時(shí)其他線程請求獲取鎖,就必須等待;
- 當(dāng)持有鎖的線程再次請求這個(gè)鎖,就可以再次拿到這個(gè)鎖,同時(shí)計(jì)數(shù)器再次遞增,這就是重入了
- 當(dāng)持有鎖的線程退出同步代碼塊時(shí),計(jì)數(shù)器遞減,當(dāng)計(jì)數(shù)器減到0時(shí),釋被放鎖
synchronized的重入:
public class SynDemo {
public static synchronized void lock1(){
System.out.println("lock1");
lock2();
}
public static synchronized void lock2(){
System.out.println("lock2");
}
public static void main(String[] args) {
new Thread(){
@Override
public void run() {
lock1();
}
}.start();
}
}
ReentrantLock的重入:
public class ReentrantTest {
private static Lock lock = new ReentrantLock();
private static int count = 0;
public static int getCount() {
return count;
}
public void test1(){
lock.lock();
try {
count++;
test2();
}finally {
lock.unlock();
}
}
public void test2(){
lock.lock();
try {
count++;
}finally {
lock.unlock();
}
}
static class MyThread implements Runnable{
private ReentrantTest reentrantTest;
public MyThread(ReentrantTest reentrantTest) {
this.reentrantTest = reentrantTest;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
reentrantTest.test1();
}
}
}
public static void main(String[] args) throws InterruptedException {
ReentrantTest reentrantTest = new ReentrantTest();
new Thread(new MyThread(reentrantTest)).start();
TimeUnit.SECONDS.sleep(2);
System.out.println(count);
}
}
多次加鎖不被阻塞就是可重入鎖的基本特征。
公平鎖&非公平鎖
公不公平的判定標(biāo)準(zhǔn)是等待時(shí)間,比如排隊(duì)買東西,誰排在前面誰先買,同理就是誰等的時(shí)間久誰先買,要分個(gè)先來后到。
如果硬要問,誰的效率高,那一定是非公平鎖的效率高,想想我們生活中的案例,競爭才能讓人變得優(yōu)秀,才能提高產(chǎn)能,鎖也不例外。但光這么說并沒有什么說服力,那就說點(diǎn)靠譜的:主要是因?yàn)楦偁幊浞掷昧薈PU,減少了線程喚醒上下文切換的時(shí)間。
ReentrantLock開啟公平鎖的方式:
//開啟公平鎖
private Lock lock = new ReentrantLock(true);
如果不傳入?yún)?shù),那就是非公平鎖本鎖了。
最后關(guān)于內(nèi)置鎖和顯示鎖的對比,你們覺得還有必要說嗎?我知道大家懶,還是總結(jié)下吧:
相同點(diǎn):
?都以阻塞性方式進(jìn)行加鎖同步,當(dāng)一個(gè)線程獲得了對象鎖,執(zhí)行同步代碼塊,則其他線程要訪問都要阻塞等待,直到獲取鎖的線程釋放鎖才能繼續(xù)獲取鎖。
不同點(diǎn):
- 對于Synchronized來說,它是java語言的關(guān)鍵字,是原生語法層面的互斥,需要jvm實(shí)現(xiàn)。而ReentrantLock它是JDK 1.5之后提供的API層面的互斥鎖,需要lock()和unlock()方法配合try/finally語句塊來完成;
- Synchronized的使用比較方便簡潔,并且由編譯器去保證鎖的加鎖和釋放,而ReenTrantLock需要手工聲明來加鎖和釋放鎖,為了避免忘記手工釋放鎖造成死鎖,所以最好在finally中聲明釋放鎖。
綜上可知,ReenTrantLock的鎖粒度和靈活度要優(yōu)于Synchronized。
ReentrantReadWriteLock
了解讀寫鎖
ReenTrantLock還有一個(gè)變種鎖ReentrantReadWriteLock,從表述來看就是多了讀寫的功能,沒錯(cuò),ReentrantReadWriteLock內(nèi)部維護(hù)了一對鎖:讀鎖,寫鎖。
在當(dāng)下讀寫分離的大開發(fā)環(huán)境下,對讀寫分別加鎖的場景就是和于ReentrantReadWriteLock,關(guān)于讀操作是否需要加鎖的問題我們先保留,這是因?yàn)樗墓ぷ鞣绞轿覀冞€不了解,繼續(xù)往下看。
ReentrantReadWriteLock的主要特征就是:
- 讀操作不互斥,寫操作互斥,讀和寫互斥;
- 支持公平性和非公平性;
- 支持鎖重入;
- 寫鎖能夠降級(jí)成為讀鎖,遵循獲取寫鎖、獲取讀鎖在釋放寫鎖的次序。讀鎖不能升級(jí)為寫鎖。
哎~看到?jīng)]?問題解決沒?不多說了啊。
實(shí)現(xiàn)原理
剛剛說過,ReentrantReadWriteLock內(nèi)部維護(hù)了一對讀寫鎖,類中定義了幾個(gè)核心方法,readLock()返回用于讀操作的鎖,writeLock()返回用于寫操作的鎖。writeLock()用于獲取寫鎖,readLock()用于獲取讀鎖。
通過一個(gè)簡單的for循環(huán)案例來演示下:
package com.codingfire.cache;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Thr {
private static int count = 0;
private static class WriteDemo implements Runnable{
ReentrantReadWriteLock lock ;
public WriteDemo(ReentrantReadWriteLock lock) {
this.lock = lock;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.writeLock().lock();
count++;
System.out.println("寫鎖: "+count);
lock.writeLock().unlock();
}
}
}
private static class ReadDemo implements Runnable{
ReentrantReadWriteLock lock ;
public ReadDemo(ReentrantReadWriteLock lock) {
this.lock = lock;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.readLock().lock();
count++;
System.out.println("讀鎖: "+count);
lock.readLock().unlock();
}
}
public static void main(String[] args) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
WriteDemo writeDemo = new WriteDemo(lock);
ReadDemo readDemo = new ReadDemo(lock);
//運(yùn)行多個(gè)寫線程,不會(huì)重復(fù),證明寫互斥
//運(yùn)行多個(gè)讀線程,可能重復(fù),證明讀不互斥
//同時(shí)運(yùn)行,讀鎖和寫鎖后面不會(huì)出現(xiàn)重復(fù)的數(shù)字,證明讀寫互斥
for (int i = 0; i < 3; i++) {
new Thread(writeDemo).start();
}
for (int i = 0; i < 3; i++) {
new Thread(readDemo).start();
}
}
}
查看運(yùn)行輸出:
寫鎖: 1 寫鎖: 2 寫鎖: 3 讀鎖: 4 讀鎖: 6 讀鎖: 5 寫鎖: 7 寫鎖: 8 寫鎖: 9 寫鎖: 10 寫鎖: 11 寫鎖: 12 寫鎖: 13 寫鎖: 14 寫鎖: 15 寫鎖: 16 寫鎖: 17 寫鎖: 18
從結(jié)果來看,符合我們對它的描述。能不符合嗎?官方爸爸都這么說。
鎖降級(jí)
讀寫鎖是支持鎖降級(jí)的,但不支持鎖升級(jí)。寫鎖可以被降級(jí)為讀鎖,但讀鎖不能被升級(jí)寫鎖。這我們在前面就已經(jīng)說過了,但這里降級(jí)的概念該怎么理解呢?這么說:獲取到了寫鎖的線程能夠再次獲取到同一把鎖的讀鎖。因?yàn)槠鋬?nèi)部有兩把鎖,它當(dāng)然可以獲取另一把讀鎖,這就是寫鎖降級(jí)為讀鎖。
我們來看看下面的案例:
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public void fun1(){
//獲取寫鎖
lock.writeLock().lock();
System.out.println("fun1");
fun2();
lock.writeLock().unlock();
}
public void fun2(){
//獲取讀鎖
lock.readLock().lock();
System.out.println("fun2");
lock.readLock().unlock();
}
}
public static void main(String[] args) {
new Demo().fun1();
}
輸出:
fun1 fun2
說明我們的理論是正確的,但還沒有驗(yàn)證寫鎖升級(jí)讀鎖,可以試試,還是上面的案例,調(diào)換下順序:
private static class Demo{
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public void fun1(){
//獲取寫鎖
lock.writeLock().lock();
System.out.println("fun1");
//fun2();
lock.writeLock().unlock();
}
public void fun2(){
//獲取讀鎖
lock.readLock().lock();
System.out.println("fun2");
fun1();
lock.readLock().unlock();
}
}
public static void main(String[] args) {
new Demo().fun2();
}
輸出:
fun2
只輸出了fun2,fun1沒有輸出,說明無法獲取到同一把鎖的寫鎖。
總結(jié)
最后,關(guān)于ReentrantReadWriteLock鎖什么時(shí)候用,和ReentrantLock一樣,只有當(dāng)高并發(fā)條件下才適合使用,其效率非常高,單線程或線程很少的情況下,沒有什么差別,甚至還可能由于結(jié)構(gòu)更復(fù)雜導(dǎo)致效率不如synchronized同步塊。
LockSupport
LockSupport是一個(gè)工具類,其內(nèi)部定義了一組公共靜態(tài)方法,通過這些方法可以對線程進(jìn)行阻塞和喚醒功能,LockSupport也是構(gòu)建同步組件的基礎(chǔ)工具。
?LockSupport定義了一組以park開頭的方法用來阻塞當(dāng)前線程,并以u(píng)npark來喚醒一個(gè)被阻塞的線程。
這部分內(nèi)容了解即可,你可能整個(gè)生涯都用不到。
AQS
AQS(AbstractQueuedSynchronizer),即隊(duì)列同步器。它是構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),是JUC并發(fā)包中的核心基礎(chǔ)組件。它維護(hù)了一個(gè) volatile 的 state 和一個(gè) CLH(FIFO)雙向隊(duì)列。
我覺得這玩意你得了解下,雖然我也很不喜歡這部分內(nèi)容,但它確實(shí)重要,為什么說重要呢?因?yàn)樗荝eentrantLock、ReentrantReadWriteLock、Semaphore這些東西的基礎(chǔ),這些東西有多重要,但對于我們開發(fā)者而言,這部分內(nèi)容,我們大致了解它的工作方式,設(shè)計(jì)模式即可,畢竟我們真正要用只是它的上層封裝工具類。這些類我們已經(jīng)講過ReentrantLock、ReentrantReadWriteLock,其他的后面也會(huì)有的。
CLH隊(duì)列
?CLH隊(duì)列鎖即Craig, Landin, and Hagersten (CLH) locks。這是三個(gè)人的名字。 同時(shí)它也是現(xiàn)在PC機(jī)內(nèi)部對于鎖的實(shí)現(xiàn)機(jī)制。我們正說的AQS就是基于CLH隊(duì)列鎖的一種變體實(shí)現(xiàn)。
CLH隊(duì)列鎖是一種基于單向鏈表的可擴(kuò)展的公平的自旋鎖,這個(gè)title可真多,它申請線程僅在本地變量上自旋,通過輪詢前驅(qū)鎖的狀態(tài)決定是否繼續(xù)輪詢或者獲取鎖。你現(xiàn)在可能不明白這種結(jié)構(gòu),沒關(guān)系,博主通過幾個(gè)狀態(tài)和圖來說明下。
單線程狀態(tài)
多線程狀態(tài)
自旋狀態(tài)
獲取到鎖狀態(tài)
總結(jié)
簡單描述就是:
- 當(dāng)有一個(gè)線程的時(shí)候,隊(duì)列沒什么意義;
- 第二個(gè)線程來了之后,第一個(gè)線程的節(jié)點(diǎn)鎖被獲取變?yōu)閠rue,第二個(gè)節(jié)點(diǎn)指向第一個(gè)節(jié)點(diǎn);
- 第二個(gè)節(jié)點(diǎn)不斷自旋,以獲取第一個(gè)節(jié)點(diǎn)的鎖狀態(tài);
- 一旦第一個(gè)節(jié)點(diǎn)的任務(wù)執(zhí)行完,鎖被釋放,第二個(gè)節(jié)點(diǎn)獲取到鎖,第一個(gè)節(jié)點(diǎn)就會(huì)從隊(duì)列中被刪除。?
設(shè)計(jì)模式
AQS是一個(gè)抽象類,這個(gè)沒什么可爭議的,子類通過繼承它,并實(shí)現(xiàn)其抽象方法。不知道你注意沒,ReentrantLock、ReentrantReadWriteLock這兩個(gè)剛學(xué)過的類就是如此:
從圖中可以看出,他們兩個(gè)并沒有直接繼承AQS,而是在其內(nèi)部擴(kuò)展了靜態(tài)內(nèi)部類來繼承AQS。 這么做的原因,就是想通過區(qū)分使用者和實(shí)現(xiàn)者,來讓使用者可以更加方便的完成對鎖的操作。
鎖面向使用者,其定義了鎖與使用者的交互實(shí)現(xiàn)方式,同時(shí)隱藏了實(shí)現(xiàn)細(xì)節(jié)。AQS面向的是鎖的實(shí)現(xiàn)者,其內(nèi)部完成了鎖的實(shí)現(xiàn)方式。通過這種方式,可以通過區(qū)分鎖和同步器讓使用者和實(shí)現(xiàn)者能夠更好的關(guān)注各自的領(lǐng)域。?
博主覺得吧,大家聽聽就好,知道這么回事就行,莫深究,越挖越深,吃不消了都。
實(shí)現(xiàn)思路
其實(shí)博主對設(shè)計(jì)模式不是很在行,但是看到AQS是用了模版設(shè)計(jì)模式,一開始還非常好奇,細(xì)看代碼才發(fā)現(xiàn),什么模版設(shè)計(jì)模式,這不就是個(gè)工廠類嗎?這種設(shè)計(jì)模式在我們的開發(fā)中大量運(yùn)用,比如JDBCTemplate、RedisTemplate、RabbitTemplate等等都在用,我們自己有時(shí)候也會(huì)用。
就好比我們定一個(gè)抽象的動(dòng)物類,并有bark,eat,sleep三個(gè)抽象方法,子類非別是people,dog,cat,分別繼承了動(dòng)物類,然后各自重寫自己的bark,eat,sleep類。
AQS使用的模版方法大致有三類:
- xxSharedxx:共享式獲取與釋放,如讀鎖;
- acquire:獨(dú)占式獲取與釋放,如寫鎖;
- 查詢同步隊(duì)列中等待線程情況。
AQS對于鎖的操作是通過同步狀態(tài)切換來完成的,其有一個(gè)變量state,我們上面提到過,這個(gè)狀態(tài)用來表示鎖的狀態(tài),state>0時(shí)表示當(dāng)前已有線程獲取到了資源,當(dāng)state = 0時(shí)表示釋放了資源。
注意:多線程下,一定會(huì)有多個(gè)線程來同時(shí)修改state變量,所以在AQS中也提供了一些方法能夠安全的對state值進(jìn)行修改,那就是CAS,我們在子類中找一下有沒有這樣的方法,在ReentrantLock源碼中找到了這個(gè):
點(diǎn)進(jìn)去的類是AbstractQueuedSynchronizer,在里面找到了找一下這個(gè)方法:
果然被我找到了,CAS真是把利器,不得不服,簡直無處不在,特別是你在做分布式相關(guān)的東西時(shí),只要你肯挖,多半都有它的影子。
AQS原理
原理
前面提到過AQS是基于CLH隊(duì)列鎖的來實(shí)現(xiàn)的,其內(nèi)部不同于CLH的單向鏈表,使用二十的雙向鏈表。對于一個(gè)隊(duì)列來說,其內(nèi)部一定會(huì)通過一個(gè)節(jié)點(diǎn)來保存線程信息,如:前驅(qū)節(jié)點(diǎn)、后繼節(jié)點(diǎn)、當(dāng)前線程節(jié)點(diǎn)、線程狀態(tài)這些信息,CLH的隊(duì)列圖我們已經(jīng)畫過了,相信大家都很了解了,AQS內(nèi)部同樣定義了一個(gè)這樣的Node對象用于存儲(chǔ)這些信息。
總結(jié)下來就是:
?兩種線程等待模式:
- SHARED:表示線程以共享模式等待鎖,如讀鎖。
- EXCLUSIVE:表示線程以獨(dú)占模式等待鎖,如寫鎖。
五種線程狀態(tài):
- 初始Node對象時(shí),默認(rèn)值為0。
- CANCELLED:表現(xiàn)線程獲取鎖的請求已經(jīng)取消,值為1。
- SINNAL:表現(xiàn)線程已經(jīng)準(zhǔn)備就緒,等待鎖空閑給我,值為-1。
- CONDITION:表示線程等待某一個(gè)條件被滿足,值為-2。
- PROPAGETE:當(dāng)線程處于SHARED模式時(shí),該狀態(tài)才會(huì)生效,用于表示線程可以被共享傳????????????????????????播,值為-3。
五個(gè)成員變量:
- waitStatus:表示線程在隊(duì)列中的狀態(tài),值對應(yīng)上述五種線程狀態(tài)。
- prev:表示當(dāng)前線程的前驅(qū)節(jié)點(diǎn)。
- next:表示當(dāng)前線程的后繼節(jié)點(diǎn)。
- thread:表示當(dāng)前線程。
- nextWaiter:表示等待condition條件的節(jié)點(diǎn)。
同時(shí)在AQS中還存在兩個(gè)成員變量,head和tail,分別代表隊(duì)頭節(jié)點(diǎn)和隊(duì)尾節(jié)點(diǎn)
。
說了這么多,但整這些博主可是記不住,但是其運(yùn)行模式我們要清楚,節(jié)點(diǎn)怎么工作也要了解。
下面,我們來圖來加深對它的結(jié)構(gòu)的理解。
整體結(jié)構(gòu)圖
?新增一個(gè)節(jié)點(diǎn)時(shí):
釋放鎖后刪除頭節(jié)點(diǎn):
總結(jié)?
看到圖,我相信總結(jié)已經(jīng)不需要再寫什么東西了,大家看完圖對它的工作狀態(tài)和結(jié)構(gòu)已經(jīng)很清晰了,真的不需要記住全部,這三張圖能記住就足夠了。?
再次吐槽
第二天晚上了,目前已經(jīng)寫了3w+的字,還沒有寫完,名天這時(shí)候可能差不多了吧,今天寫的內(nèi)容比較晦澀,其實(shí)說實(shí)話,博主寫完也未必能再復(fù)述下來,但總結(jié)好總歸是隨用隨取,很多東西開發(fā)中根本不會(huì)涉及那么底層,但我們還是要明白其工作原理,這對我們開發(fā)有好處。還是洗洗睡吧,來日再戰(zhàn)。
Fork/Join分解合并框架
什么是Fork/Join框架
Fork/Join框架是jdk1.7開始提供的一個(gè)并行任務(wù)框架,可以在不去了解Thread、Runnable等相關(guān)知識(shí)的情況下,只要遵循fork/join開發(fā)模式,就完成寫出很好的多線程并發(fā)任務(wù),可以說簡化了多線程的開發(fā)步驟。
它的主要思想是分而治之,把一個(gè)大任務(wù)分成若干小份,分別執(zhí)行,最后再匯聚成一份,得到大任務(wù)結(jié)果。所以理解起來一很容易,簡單來說分為兩步:
第一步,分割任務(wù),并行執(zhí)行;
第二步,合并子任務(wù)結(jié)果,得到最終結(jié)果。
為了更好的表示這個(gè)過程,我們畫個(gè)圖,我先找找,看能不能找到一個(gè)合適的圖,不行就自己畫,最終還是自己畫:
從圖中你可以看到一點(diǎn),任務(wù)的劃分并不是均勻的。
工作竊取算法
按照上圖,每一個(gè)小任務(wù)最終都會(huì)存在于一個(gè)任務(wù)隊(duì)列中,說到這里,我覺得有必要再畫個(gè)圖了,上圖不足以描述隊(duì)列中的任務(wù):
假如說有這兩個(gè)線程,當(dāng)線程1中的任務(wù)率先執(zhí)行完畢,線程1將從線程2中取出沒有執(zhí)行的任務(wù)放到自己的線程隊(duì)列中執(zhí)行。利用自己的閑置時(shí)間去執(zhí)行其他線程的任務(wù),能夠減少線程阻塞或是閑置的時(shí)間,提高 CPU 利用率?。這就是工作竊取算法的核心。
Fork/Join基本使用
使用前瞻
Fork/Join的任務(wù)叫ForkJoinTask,F(xiàn)orkJoinTask的執(zhí)行需要一個(gè)pool。ForkJoinTask的內(nèi)部包含了fork和join的操作機(jī)制,開發(fā)者在使用的時(shí)候不需要直接繼承ForkJoinTask,而是繼承它的子類:
- RecursiveAction:返回沒有結(jié)果的任務(wù)。
- RecursiveTask:返回有結(jié)果的任務(wù)。
工作流程:
- 新建ForkJoinPool;
- 新建ForkJoinTask(RecursiveAction/RecursiveTask);
- 在任務(wù)的compute方法,根據(jù)自定義條件進(jìn)行任務(wù)拆分,如果條件滿足則執(zhí)行任務(wù),如果條件不滿足則繼續(xù)拆分任務(wù),當(dāng)所有任務(wù)都執(zhí)行完,進(jìn)行最終結(jié)果的合并;
- 通過get或join獲取最終結(jié)果。
同步有結(jié)果值
//forkJoin累加
public class RecursiveTest{
//自定義任務(wù)
private static class SumTask extends RecursiveTask<Integer> {
private final static int THRESHOLD=5;
private int[] src;
private int fromIndex;
private int endIndex;
public SumTask(int[] src, int fromIndex, int endIndex) {
this.src = src;
this.fromIndex = fromIndex;
this.endIndex = endIndex;
}
@Override
protected Integer compute() {
//判斷是否符合任務(wù)大小
if (endIndex-fromIndex<THRESHOLD){
//符合條件
int count = 0;
for (int i = fromIndex; i <= endIndex; i++) {
count+=src[i];
}
return count;
}else {
//繼續(xù)拆分任務(wù)
//基于二分查找對任務(wù)進(jìn)行拆分
int mid = (fromIndex+endIndex)/2;
SumTask left = new SumTask(src,fromIndex,mid);
SumTask right = new SumTask(src,mid+1,endIndex);
invokeAll(left,right);
return left.join()+right.join();
}
}
}
public static void main(String[] args) {
int[] numArray = new int[]{1,23,15,45,145,456,21,3,55,22,77,44,33,22,90,12,46,78};
ForkJoinPool pool = new ForkJoinPool();
SumTask sumTask = new SumTask(numArray,0,numArray.length-1);
long start = System.currentTimeMillis();
pool.invoke(sumTask);
System.out.println("spend time: "+(System.currentTimeMillis()-start));
System.out.println("sum = " + sum);
}
}
不是說所有的這種操作都要使用fork/join,fork/join適合運(yùn)算量比較大的操作,它內(nèi)部會(huì)利用CPU多核特性,結(jié)合線程上下文切換,所以肯定是有消耗的,和傳統(tǒng)的for循環(huán)便利相比,數(shù)據(jù)量少肯定性能不如它。
異步無結(jié)果值
package com.codingfire.cache;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class FindFile extends RecursiveAction {
private File path;
public FindFile(File path) {
this.path = path;
}
@Override
protected void compute() {
List<FindFile> takes = new ArrayList<>();
//獲取指定路徑下的所有文件
File[] files = path.listFiles();
if (files != null){
for (File file : files) {
//是否為文件夾
if (file.isDirectory()){
//遞歸調(diào)用
takes.add(new FindFile(file));
}else {
//不是文件夾。執(zhí)行檢查
if (file.getAbsolutePath().endsWith("vue")){
System.out.println(file.getAbsolutePath());
}
}
}
//調(diào)度所有子任務(wù)執(zhí)行
if (!takes.isEmpty()){
for (FindFile task : invokeAll(takes)){
//阻塞當(dāng)前線程并等待獲取結(jié)果
task.join();
}
}
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
FindFile task = new FindFile(new File("/Users/Codeliu/Desktop/vue"));
pool.submit(task);
//主線程join,等待子任務(wù)執(zhí)行完畢。
task.join();
System.out.println("task end");
}
}
執(zhí)行結(jié)果會(huì)把vue結(jié)尾的文件全部打印出來,有結(jié)果無結(jié)果主要看你繼承的什么類,執(zhí)行的是什么任務(wù),compute里怎么寫,不要過多糾結(jié),知道怎么用就行。
fork/join總結(jié)
博主覺得fork/join暫時(shí)可能給大家講不明白,這里只簡單的涉及下,后期還會(huì)單獨(dú)出一篇博客來說明,因?yàn)橛泻芏嗌婕霸创a的地方還需要再考慮考慮,特別是join的流程,invoke和invokeAll的區(qū)別,fork和invoke關(guān)系等,都存在很大的疑點(diǎn),想要段時(shí)間搞明白恐怕不易,有點(diǎn)后悔博客寫太長了,還是應(yīng)該分成幾篇循序漸進(jìn)啊,這篇還是硬著頭皮繼續(xù)寫,后續(xù)內(nèi)容看情況,實(shí)在不行就分開寫,要不然寫的太長,內(nèi)容太多,很多東西涉及了,但卻講的不深會(huì)被大家詬?。耗氵@還挖墳?zāi)兀康仄ざ紱]揭起來吧?哎~心好累,寫了好幾天了都。
并發(fā)的工具類
JDK并發(fā)包下提供了幾個(gè)很有用的并發(fā)工具類:CountDownLatch、CyclicBarrier、Semaphore、Exchanger,通過他們可以在不同場景下完成一些特定的功能,AQS里有提到其中的一些,畢竟隊(duì)列同步器是構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架。
CountDownLatch
CountDownLatch是閉鎖,它允許一個(gè)或多個(gè)線程等待其他線程完成工作,其內(nèi)部通過計(jì)數(shù)器實(shí)現(xiàn),當(dāng)執(zhí)行到某個(gè)節(jié)點(diǎn)時(shí),開始等待其他任務(wù)執(zhí)行,每完成一個(gè),計(jì)數(shù)器減1,當(dāng)計(jì)數(shù)器等于0時(shí),代表任務(wù)已全部完成,恢復(fù)之前等待的線程并繼續(xù)向下執(zhí)行。
CountDownLatch的一個(gè)典型使用場景就是解析一個(gè)多sheet的Excel文件,等到解析完所有sheet后,再進(jìn)行后續(xù)操作。
舉個(gè)例子:
package com.codingfire.cache;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatchDemo {
static CountDownLatch countDownLatch = new CountDownLatch(5);
//任務(wù)線程
private static class TaskThread implements Runnable{
@Override
public void run() {
countDownLatch.countDown();
System.out.println("task thread is running");
}
}
//等待線程
private static class WaitThread implements Runnable{
@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("wait thread is running");
}
}
public static void main(String[] args) throws InterruptedException {
//等待線程執(zhí)行
for (int i = 0; i < 2; i++) {
new Thread(new WaitThread()).start();
}
for (int i = 0; i < 5; i++) {
new Thread(new TaskThread()).start();
}
TimeUnit.SECONDS.sleep(3);
}
}
輸出結(jié)果:
task thread is running task thread is running task thread is running task thread is running task thread is running wait thread is running wait thread is running
普通任務(wù)執(zhí)行結(jié)束后,CountDownLatch任務(wù)才執(zhí)行,不管你設(shè)置1個(gè)還是多個(gè),都是如此。博主覺得通過文字描述和案例理解起來更容易,這個(gè)圖畫出來反倒不好理解了,所以就不畫了。
CycliBarrier
CycliBarrier是同步屏障,當(dāng)一組任務(wù)執(zhí)行時(shí),第一個(gè)任務(wù)到達(dá)屏障點(diǎn)開始等待,直到最后一個(gè)任務(wù)到達(dá)屏障點(diǎn),屏障解除,任務(wù)可以繼續(xù)向下執(zhí)行,它內(nèi)部是基于計(jì)數(shù)器思想實(shí)現(xiàn)的。此處可以有圖:
代碼實(shí)現(xiàn):
public class CyclicBarrierDemo {
static CyclicBarrier barrier = new CyclicBarrier(3);
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+": do somethings");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":continue somethings");
}).start();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+": do somethings");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":continue somethings");
}).start();
//主線程
try {
System.out.println(Thread.currentThread().getName()+": do somethings");
barrier.await();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":continue somethings");
}
}
輸出:
Thread-0: do somethings main: do somethings Thread-1: do somethings Thread-1:continue somethings Thread-0:continue somethings main:continue somethings?
屏障點(diǎn)之后主線程繼續(xù)執(zhí)行,在此之前,不必關(guān)心哪個(gè)先哪個(gè)后。?
CycliBarrier的構(gòu)造函數(shù)不僅可以傳入需要等待的線程數(shù),同時(shí)還可以傳入一個(gè)Runnable,對于這個(gè)傳入的Runnable,可以作為一個(gè)擴(kuò)展任務(wù)來使用。
看案例:
public class CyclicBarrierExtendDemo {
static CyclicBarrier barrier = new CyclicBarrier(3,new ExtendTask());
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+": do somethings");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":continue somethings");
}).start();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+": do somethings");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":continue somethings");
}).start();
//主線程
try {
System.out.println(Thread.currentThread().getName()+": do somethings");
barrier.await();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":continue somethings");
}
static class ExtendTask implements Runnable{
@Override
public void run() {
System.out.println("extend task running");
}
}
}
輸出:
Thread-0: do somethings main: do somethings Thread-1: do somethings extend task running Thread-1:continue somethings Thread-0:continue somethings main:continue somethings
?到達(dá)屏障點(diǎn)后,先執(zhí)行擴(kuò)展任務(wù),然后才繼續(xù)執(zhí)行剩下的任務(wù),增強(qiáng)了自主性,可以個(gè)性化功能了。
不知道你發(fā)現(xiàn)沒,CyclicBarrier是固定線程數(shù),CountDownLatch則沒有這個(gè)限制,可多可少。另外,CountDownLatch的await 阻塞工作線程,所有準(zhǔn)備執(zhí)行的線程都要執(zhí)行countDown來減少計(jì)數(shù)器的值,CyclicBarrier是通過自身的await來阻塞線程,兩者有本質(zhì)區(qū)別,都仔細(xì)看看。
最后再爆一嘴,這個(gè)功能和iOS多線程GCD里面的柵欄功能類似,只是柵欄也不限定線程的數(shù)量。
Semaphore
Semaphore是信號(hào)量,好巧不巧的,iOS也有信號(hào)量,而且功能還類似,天下語言一家親啊。信號(hào)量主要做流量控制,比如說1000個(gè)人要進(jìn)入展廳參觀,但是每次最多只能進(jìn)100個(gè),100是最大載客量,那就只能在門口安排一個(gè)工作人員來數(shù)數(shù),第一次達(dá)到100個(gè)后咔的拉上警戒線,其他人都在外面等著,但是這100人不必同時(shí)離開,當(dāng)出一個(gè)人時(shí),入口就放進(jìn)來1個(gè)人,只要保證展廳最多只能存在100人即可,但不必一定達(dá)到100,要看進(jìn)出的速度,也就是任務(wù)執(zhí)行的速度。
看代碼理解下:
package com.codingfire.cache;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
private static final int THREAD_COUNT=20;
private static ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
static Semaphore semaphore = new Semaphore(5);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
executorService.execute(()->{
try {
//獲取資源
semaphore.acquire();
System.out.println("進(jìn)入");
//釋放資源
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
輸出有點(diǎn)長,就不貼出來了,也沒什么意義,要注意,獲取資源后執(zhí)行任務(wù),在后面要對任務(wù)進(jìn)行釋放,釋放任務(wù)的契機(jī)可以是執(zhí)行結(jié)束后,也可以是其他,要看具體業(yè)務(wù),但同意之間,最多只能有指定數(shù)量的任務(wù)進(jìn)入。
Exchanger
Exchanger是交換器,它是一個(gè)線程協(xié)作工具類,可以進(jìn)行線程間的數(shù)據(jù)交換,但僅限兩個(gè)線程間。它提供了一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn),兩個(gè)線程可以交換彼此的數(shù)據(jù)。
來看個(gè)案例:
package com.codingfire.cache;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
private static final Exchanger<Set<String>> exchange = new Exchanger<Set<String>>();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
Set<String> setA = new HashSet<String>();//存放數(shù)據(jù)的容器
try {
setA.add("a1");
setA = exchange.exchange(setA);//交換set
/*處理交換后的數(shù)據(jù)*/
System.out.println(Thread.currentThread().getName()+" : "+setA.toString());
} catch (InterruptedException e) {
}
}
},"setA").start();
new Thread(new Runnable() {
@Override
public void run() {
Set<String> setB = new HashSet<String>();//存放數(shù)據(jù)的容器
try {
/*添加數(shù)據(jù)
* set.add(.....)
* set.add(.....)
* */
setB.add("b1");
setB = exchange.exchange(setB);//交換set
/*處理交換后的數(shù)據(jù)*/
System.out.println(Thread.currentThread().getName()+" : "+setB.toString());
} catch (InterruptedException e) {
}
}
},"setB").start();
}
}
執(zhí)行輸出:
setB : [a1] setA : [b1]?
可以看到,數(shù)據(jù)已經(jīng)交換了。
隊(duì)列
隊(duì)列分為阻塞和非阻塞兩種,有一張表想分享給大家:
隊(duì)列類別 | 阻塞 | 有界 | 線程安全 | 場景 | 注意事項(xiàng) |
---|---|---|---|---|---|
ConcurrentLinkedQueue | 非阻塞 | 無界 | CAS | 操作全局集合 | size() 要遍歷一遍集合,慎用 |
ArrayBlockingQueue | 阻塞 | 有界 | 一把全局鎖 | 生產(chǎn)消費(fèi)模型,平衡兩邊處理速度 | |
LinkedBlockingQueue | 阻塞 | 可配置 | 存取采用2把鎖 | 生產(chǎn)消費(fèi)模型,平衡兩邊處理速度 | 無界的時(shí)候注意內(nèi)存溢出 |
PriorityBlockingQueue | 阻塞 | 無界 | 一把全局鎖 | 支持優(yōu)先級(jí)排序 | |
SynchronousQueue | 阻塞 | 無界 | CAS | 不存儲(chǔ)元素的阻塞隊(duì)列 |
ConcurrentLinkedQueue
簡介
ConcurrentLinkedQueue是非阻塞隊(duì)列,在單線程中,經(jīng)常會(huì)用到一些例如ArrayList,HashMap的集合,但他們都不是線程安全的,其中Vector算是線程安全的,但它的做法過于簡單粗暴,就是直接在方法上添加synchronized同步塊作為獨(dú)占鎖,將原本的多線程串行化,ArrayList同樣也可以這么做,這種方式效率很低,明顯不是我們想要的結(jié)果,這時(shí)候ConcurrentLinkedQueue隊(duì)列就派上用場了。
ConcurrentLinkedQueue是一個(gè)基于鏈接節(jié)點(diǎn)的無邊界的線程安全隊(duì)列,它遵循隊(duì)列的FIFO原則,隊(duì)尾入隊(duì),隊(duì)首出隊(duì),采用CAS算法來實(shí)現(xiàn)這一點(diǎn)。
使用
public class ConcurrentLinkedQueueDemo {
public static void main(String[] args) {
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
//offer(E e),add(E e)都是將指定元素插入隊(duì)列的尾部
queue.offer("java");
queue.add("iOS");
System.out.println("offer后,隊(duì)列是否空?" + queue.isEmpty());
//peek()獲取但不移除此隊(duì)列的頭,如果隊(duì)列為空,則返回null
System.out.println("從隊(duì)列中peek:" + queue.peek());
//poll()獲取并移除此隊(duì)列的頭,如果此隊(duì)列為空,則返回null
System.out.println("從隊(duì)列中poll:" + queue.poll());
System.out.println("pool后,隊(duì)列是否空?" + queue.isEmpty());
//remove():從隊(duì)列中刪除指定元素
System.out.println("從隊(duì)列中remove元素:"+queue.remove("iOS"));
}
}
上面提到過,ConcurrentLinkedQueue的size方法會(huì)遍歷集合,很慢,慎用!??!queue.size()>0用 !queue.isEmpty()替代。ConcurrentLinkedQueue本身并不能保證線程安全,還需要自己進(jìn)行同步或加鎖操作,區(qū)別在于,我們之前保證的是線程安全,現(xiàn)在保證的是隊(duì)列安全,主要是防止其他線程操作隊(duì)列。
BlockingQueue
BlockingQueue是阻塞隊(duì)列,當(dāng)隊(duì)列滿的時(shí)候入,當(dāng)隊(duì)列空的時(shí)候出都會(huì)造成阻塞。我們在上面已經(jīng)知道了,阻塞隊(duì)列共有四種,下面我們來分別介紹這四種隊(duì)列。
ArrayBlockingQueue
ArrayBlockingQueue是一個(gè)由數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列,該隊(duì)列采用FIFO的原則對元素進(jìn)行排序添加。其大小在構(gòu)造時(shí)由構(gòu)造函數(shù)來決定,確認(rèn)之后就不能再改變。這和我們普通數(shù)組也是一樣的。
ArrayBlockingQueue可以選擇公平還是不公平的訪問策略,公平性通常會(huì)降低吞吐量,但是減少了可變性,避免了“不平衡性”。內(nèi)部使用了可重入鎖ReentrantLock + Condition來完成多線程環(huán)境的并發(fā)操作。
它的使用場景適合于多線程處理某個(gè)任務(wù),但對順序有要求。它可以一邊多線程的處理數(shù)據(jù),一邊多線程的保存數(shù)據(jù),而且順序不會(huì)亂,比如入站口高并發(fā)的人臉識(shí)別,都需要提交到服務(wù)器處理,不可能一次搞一個(gè)吧?肯定是多個(gè)設(shè)備同時(shí)工作,服務(wù)器假設(shè)只有一臺(tái),我們希望它可以更高效率的執(zhí)行驗(yàn)證的部分,就可以用到這個(gè)隊(duì)列,看下案例:
package com.codingfire.cache;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ArrayBlockingQueueDemo {
//最大容量為5的數(shù)組阻塞隊(duì)列
private static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(5, true);
public static void main(String[] args) {
Thread t1 = new Thread(new ProducerTask());
Thread t2 = new Thread(new ConsumerTask());
//啟動(dòng)線程
t1.start();
t2.start();
}
//生產(chǎn)者
static class ProducerTask implements Runnable {
private Random rnd = new Random();
@Override
public void run() {
try {
while (true) {
int value = rnd.nextInt(100);
//如果queue容量已滿,則當(dāng)前線程會(huì)堵塞,直到有空間再繼續(xù)
queue.put(value);
System.out.println("生產(chǎn)者:" + value);
TimeUnit.MILLISECONDS.sleep(100); //線程休眠
}
} catch (Exception e) {
}
}
}
//消費(fèi)者
static class ConsumerTask implements Runnable {
@Override
public void run() {
try {
while (true) {
//如果queue為空,則當(dāng)前線程會(huì)堵塞,直到有新數(shù)據(jù)加入
Integer value = queue.take();
System.out.println("消費(fèi)者:" + value);
TimeUnit.MILLISECONDS.sleep(15); //線程休眠
}
} catch (Exception e) {
}
}
}
}
這個(gè)隊(duì)列最大處理量為5,有空閑就會(huì)放新的數(shù)據(jù)進(jìn)來,生產(chǎn)者記錄人臉信息,消費(fèi)者負(fù)責(zé)比對,順序是一樣的,保證不會(huì)出錯(cuò)。自己運(yùn)行下看看吧,其源碼分析可以自己去查下,網(wǎng)上博主看到有很多。
LinkedBlockingQueue
LinkedBlockingQueue和ArrayBlockingQueue的使用方式基本一樣,區(qū)別如下:
- 隊(duì)列的數(shù)據(jù)結(jié)構(gòu)不同
- ArrayBlockingQueue是一個(gè)由數(shù)組支持的有界阻塞隊(duì)列
- LinkedBlockingQueue是一個(gè)基于鏈表的有界(可設(shè)置)阻塞隊(duì)列
-
隊(duì)列大小初始化方式不同
- ArrayBlockingQueue實(shí)現(xiàn)的隊(duì)列中必須指定隊(duì)列的大小,這和數(shù)組一樣
- LinkedBlockingQueue實(shí)現(xiàn)的隊(duì)列中可以不指定隊(duì)列的大小,但是默認(rèn)是Integer.MAX_VALUE
- 隊(duì)列中鎖實(shí)現(xiàn)不同
- ArrayBlockingQueue實(shí)現(xiàn)的隊(duì)列中鎖沒有分離,生產(chǎn)和消費(fèi)用共同一個(gè)鎖
- LinkedBlockingQueue實(shí)現(xiàn)的隊(duì)列中鎖是分離的,生產(chǎn)用putLock,消費(fèi)用takeLock
-
生產(chǎn)或消費(fèi)時(shí)操作不同
- ArrayBlockingQueue隊(duì)列在生產(chǎn)和消費(fèi)的時(shí)候,是直接將對象插入或移除的
- LinkedBlockingQueue隊(duì)列在生產(chǎn)和消費(fèi)的時(shí)候,是先將對象轉(zhuǎn)換為Node,再進(jìn)行插入或移除,多了這一步會(huì)影響性能
其實(shí)這些大家根據(jù)數(shù)組和鏈表的特征基本上是可以找到不同點(diǎn)的。
PriorityBlockingQueue
PriorityBlockingQueue是一個(gè)優(yōu)先級(jí)隊(duì)列,內(nèi)部使用一個(gè)獨(dú)占鎖來控制,同時(shí)只有一個(gè)線程可以進(jìn)行入隊(duì)和出隊(duì),它是無界的,就是說向Queue里面增加元素沒有數(shù)量限制,但可能會(huì)導(dǎo)致內(nèi)存溢出而失敗。
PriorityBlockingQueue如其名,始終保證出隊(duì)的元素是優(yōu)先級(jí)最高的元素,并且優(yōu)先級(jí)規(guī)則可以定制。內(nèi)部通過一個(gè)可擴(kuò)容數(shù)組保存元素,規(guī)則是:當(dāng)前元素個(gè)數(shù)>=最大容量時(shí)候會(huì)通過算法擴(kuò)容,為了避免在擴(kuò)容操作時(shí)其他線程不能進(jìn)行出隊(duì)操作,會(huì)先釋放鎖,然后通過CAS保證同一時(shí)間只有一個(gè)線程可以擴(kuò)容成功。
PriorityBlockingQueue不允許空值,而且不支持non-comparable(不可比較)的對象,優(yōu)先隊(duì)列的頭是基于自然排序或Comparator排序的最小元素,如果有多個(gè)對象擁有同樣的排序,那么就隨機(jī)地取其中任意一個(gè),也可以通過Comparator(比較器)在隊(duì)列實(shí)現(xiàn)自定義排序。當(dāng)獲取隊(duì)列時(shí),將返回隊(duì)列的頭對象。他也是無界的,初始化時(shí)可設(shè)置大小,但隨著添加會(huì)自動(dòng)擴(kuò)容。
SynchronousQueue
SynchronousQueue不存儲(chǔ)元素,而是維護(hù)一組線程用于數(shù)據(jù)的入隊(duì)和出隊(duì),所以嚴(yán)格意義上不是一個(gè)真正的隊(duì)列。正因?yàn)槿绱?,put和take會(huì)一直阻塞,直到有另一個(gè)線程已經(jīng)準(zhǔn)備好要處理數(shù)據(jù)。
SynchronousQueue使用直接交付的方式,將更多關(guān)于任務(wù)狀態(tài)的信息反饋給生產(chǎn)者,當(dāng)交付被接受時(shí),它就知道消費(fèi)者已經(jīng)得到了任務(wù),而不是簡單地把任務(wù)放入一個(gè)隊(duì)列不管是否被隊(duì)列拿到。
SynchronousQueue默認(rèn)使用非公平排序,也可設(shè)置公平排序。但公平所構(gòu)造的隊(duì)列使線程以 FIFO 的順序進(jìn)行訪問,通常會(huì)降低吞吐量,好處是可以減小可變性并避免得不到服務(wù)。
SynchronousQueue特點(diǎn):
- 是一種阻塞隊(duì)列,其中每個(gè) put 必須等待一個(gè) take,反之亦然。同步隊(duì)列沒有任何內(nèi)部容量,甚至連一個(gè)隊(duì)列的容量都沒有。
- 是線程安全的,是阻塞的。
- 不允許使用 null 元素。
- 公平排序策略是指調(diào)用put的線程之間,或take的線程之間的線程以 FIFO 的順序進(jìn)行訪問。
SynchronousQueue的方法:
- iterator(): 永遠(yuǎn)返回空,因?yàn)槔锩鏇]東西。
- peek() :永遠(yuǎn)返回null。
- put() :往queue放進(jìn)去一個(gè)element以后就一直wait直到有其他thread進(jìn)來把這個(gè)element取走。
- offer() :往queue里放一個(gè)element后立即返回,如果碰巧這個(gè)element被另一個(gè)thread取走了,offer方法返回true,認(rèn)為offer成功;否則返回false。
- offer(2000, TimeUnit.SECONDS) :往queue里放一個(gè)element但等待時(shí)間后才返回,和offer()方法一樣。
- take() :取出并且remove掉queue里的element,取不到東西他會(huì)一直等。
- poll() :取出并且remove掉queue里的element,方法立即能取到東西返回。否則立即返回null。
- poll(2000, TimeUnit.SECONDS) :等待時(shí)間后再取,并且remove掉queue里的element,
- isEmpty():永遠(yuǎn)是true。
- remainingCapacity() :永遠(yuǎn)是0。
- remove()和removeAll() :永遠(yuǎn)是false。
使用案例:
package com.codingfire.cache;
import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
new Thread(new Product(queue)).start();
new Thread(new Customer(queue)).start();
}
static class Product implements Runnable {
SynchronousQueue<Integer> queue;
Random r = new Random();
public Product(SynchronousQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
int number = r.nextInt(1000);
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("等待2秒后發(fā)送" + number);
queue.put(number);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
static class Customer implements Runnable {
SynchronousQueue<Integer> queue;
public Customer(SynchronousQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
System.out.println("接收到num:" + queue.take());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
輸出:
等待2秒后發(fā)送310 接收到num:310 等待2秒后發(fā)送382 接收到num:382 等待2秒后發(fā)送897 接收到num:897 等待2秒后發(fā)送898 接收到num:898 等待2秒后發(fā)送774 接收到num:774 等待2秒后發(fā)送60 接收到num:60 等待2秒后發(fā)送532 接收到num:532 等待2秒后發(fā)送773 接收到num:773 ......
感覺上和ConcurrentLinkedQueue一樣,差別是,SynchronousQueue是線程安全的,是阻塞的,ConcurrentLinkedQueue隊(duì)列內(nèi)雖然也是線程安全的,但我們要放著其他線程同時(shí)操作這個(gè)隊(duì)列。
ThreadPoolExecutor線程池
在介紹Fork/Join的時(shí)候我們提到,F(xiàn)orkJoinTask的執(zhí)行時(shí)需要一個(gè)ForkJoinPool,這是一個(gè)類似線程池的東西,但和Java線程池有區(qū)別,雖然都是用來管理線程的,但ForkJoinPool的線程池內(nèi)的線程都對應(yīng)一個(gè)任務(wù)隊(duì)列(WorkQueue),隊(duì)列可能有多個(gè),工作線程優(yōu)先處理來自自身隊(duì)列的任務(wù)(LIFO或FIFO順序,參數(shù) mode 決定),然后以FIFO的順序隨機(jī)竊取其他隊(duì)列中的任務(wù)。Java中的線程池則是一個(gè)沒有感情的調(diào)度機(jī)器,按照規(guī)章制度辦事,什么規(guī)章制度呢?如下:
- 任務(wù)提交到線程池后,如果當(dāng)前線程數(shù)小于核心線程數(shù),就創(chuàng)建線程并執(zhí)行,不會(huì)銷毀原來的線程,直到達(dá)到核心線程數(shù);
- 當(dāng)核心線程都在執(zhí)行還有任務(wù)提交時(shí),任務(wù)放在阻塞隊(duì)列中等待線程執(zhí)行完之后再來執(zhí)行隊(duì)列中的任務(wù);
- 當(dāng)阻塞隊(duì)列也滿了后,繼續(xù)創(chuàng)建線程并執(zhí)行任務(wù),直到達(dá)到最大線程數(shù);
- 最大線程也滿了以后,執(zhí)行拒絕策略;
- 當(dāng)線程空閑后,線程在達(dá)到空閑等待時(shí)間后自動(dòng)銷毀,直至數(shù)量降低至核心線程數(shù)為止。
?
線程池的意義
我們經(jīng)常說上下文切換,經(jīng)常說消耗資源,那么到底說的是誰呢?此線程爾!所以線程處于無序狀態(tài)時(shí)非常不利于程序的運(yùn)行,這時(shí)候就需要一個(gè)線程池了,線程池應(yīng)用最多的場景是多線程并發(fā),其優(yōu)點(diǎn)如下:
- 降低資源消耗!通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗;
- 提高響應(yīng)速度!當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等線程創(chuàng)建就能立即執(zhí)行;
- 提高線程的可管理性!線程是稀缺資源,要合理利用,通過線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。
線程池共有五種狀態(tài),分別是:
- RUNNING:處于RUNNING狀態(tài)的線程池能夠接收新任務(wù),對新添加的任務(wù)進(jìn)行處理;
- SHUTDOWN:處于SHUTDOWN狀態(tài)的線程池不可以接收新任務(wù),但是可以對已添加的任務(wù)進(jìn)行處理;
- STOP:處于STOP狀態(tài)的線程池不接收新任務(wù),不處理已添加的任務(wù),并且會(huì)中斷正在處理的任務(wù);
- TIDYING:當(dāng)所有的任務(wù)已終止,線程池會(huì)變?yōu)門IDYING狀態(tài)。當(dāng)線程池變?yōu)門IDYING狀態(tài)時(shí),會(huì)執(zhí)行鉤子函數(shù)terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變?yōu)門IDYING時(shí),可以通過重載terminated()函數(shù)來實(shí)現(xiàn);
- TERMINATED:線程池徹底終止。
用一張圖來說明它們之間的關(guān)系:
構(gòu)造方法參數(shù)介紹
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:
?核心線程數(shù)(線程池基本大小),在沒有任務(wù)需要執(zhí)行的時(shí)候的線程池大小。當(dāng)提交一個(gè)任務(wù)時(shí),線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),直到線程數(shù)等于該參數(shù)。 如果當(dāng)前線程數(shù)為該參數(shù),后續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行。
maximumPoolSize:
? 線程池中允許的最大線程數(shù),線程池中的當(dāng)前線程數(shù)目不會(huì)超過該值。如果當(dāng)前阻塞隊(duì)列滿了,且繼續(xù)提交任務(wù),如果當(dāng)前的線程數(shù)小于maximumPoolSize,則會(huì)新建線程來執(zhí)行任務(wù)。
keepAliveTime:
? 線程池空閑時(shí)的存活時(shí)間,即當(dāng)線程池沒有任務(wù)執(zhí)行時(shí),繼續(xù)存活的時(shí)間。默認(rèn)情況下,該參數(shù)只在線程數(shù)大于corePoolSize時(shí)才有用。
workQueue:
?必須是BolckingQueue有界阻塞隊(duì)列,用于實(shí)現(xiàn)線程池的阻塞功能。當(dāng)線程池中的線程數(shù)超過它的corePoolSize時(shí),線程會(huì)進(jìn)入阻塞隊(duì)列進(jìn)行阻塞等待。
threadFactory:
?用于設(shè)置創(chuàng)建線程的工廠。ThreadFactory的作用就是提供創(chuàng)建線程的功能的線程工廠。他是通過newThread()方法提供創(chuàng)建線程的功能,newThread()方法創(chuàng)建的線程都是“非守護(hù)線程”而且“線程優(yōu)先級(jí)都是默認(rèn)優(yōu)先級(jí)”,默認(rèn)5還記得嗎?守護(hù)線程需要設(shè)置d開頭的一個(gè)屬性為true,都還記得吧?
handler:
?線程池拒絕策略。當(dāng)阻塞隊(duì)列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),則必須采取一種策略處理該任務(wù)。
- AbortPolicy:默認(rèn)策略,直接拋出異常;
- CallerRunsPolicy:用調(diào)用者所在的線程執(zhí)行任務(wù);
- DiscardOldestPolicy:插入阻塞隊(duì)列的頭部任務(wù),并執(zhí)行當(dāng)前任務(wù);
- DiscardPolicy:丟棄任務(wù)。
自定義一個(gè)線程池看看:
package com.codingfire.cache;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolDemo {
public static void main(String[] args) {
//創(chuàng)建阻塞隊(duì)列
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(100);
//創(chuàng)建工廠
ThreadFactory threadFactory = new ThreadFactory() {
AtomicInteger atomicInteger = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
//創(chuàng)建線程把任務(wù)傳遞進(jìn)去
Thread thread = new Thread(r);
//設(shè)置線程名稱
thread.setName("MyThread: "+atomicInteger.getAndIncrement());
return thread;
}
};
ThreadPoolExecutor pool = new ThreadPoolExecutor(10,
10,
1,
TimeUnit.SECONDS,
queue,
threadFactory);
for (int i = 0; i < 11; i++) {
pool.execute(new Runnable() {
@Override
public void run() {
//執(zhí)行業(yè)務(wù)
System.out.println(Thread.currentThread().getName()+" 執(zhí)行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"結(jié)束");
}
});
}
}
}
執(zhí)行結(jié)果:
下不說這個(gè)執(zhí)行結(jié)果,你看看紅色圈的空白區(qū)域,按理說執(zhí)行完結(jié)束應(yīng)該有個(gè)Process finished字樣,但這里沒有,說明被掛起了,等待新任務(wù)進(jìn)入。然后看執(zhí)行的任務(wù),我們設(shè)置的最大線程數(shù)10,任務(wù)數(shù)11,所以第十一個(gè)在前10個(gè)后執(zhí)行,不過有一點(diǎn)博主要說明,第一個(gè)任務(wù)執(zhí)行的結(jié)果和第十一個(gè)任務(wù)執(zhí)行的結(jié)果位置有可能互換,前提是執(zhí)行都要在結(jié)果前,這里是因?yàn)槲覀冊O(shè)置了固定的睡眠時(shí)間,實(shí)際中不可能會(huì)這么均勻。
預(yù)定義線程池
除了通過ThreadPoolExecutor自定義線程池外,Executor框架還提供了四種線程池,他們都可以通過工具類Executors來創(chuàng)建。下面,我們就來看看這些線程池及其特點(diǎn)。
FixedThreadPool
這是一個(gè)創(chuàng)建固定線程數(shù)的線程池,適用于滿足資源管理而需要限制當(dāng)前線程數(shù)量的場景,同時(shí)也適用于負(fù)載較重的服務(wù)器。
其定義如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
三個(gè)參數(shù)意義看下方:
-
nThreads
- FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被設(shè)置為創(chuàng)建FixedThreadPool 時(shí)指定的參數(shù) nThreads
-
keepAliveTime
- ??????????????此處設(shè)置為了0L,代表多于的空閑線程會(huì)被立即終止
-
LinkedBlockingQueue
- FixedThreadPool 使用有界隊(duì)列 LinkedBlockingQueue 作為線程池的工作隊(duì)列(隊(duì)列的容量為 Integer.MAX_VALUE。
舉個(gè)例子吧:
package com.codingfire.cache;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolCase {
static class Thr implements Runnable {
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name);
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
exec.execute(new Thr());
Thread.sleep(10);
}
exec.shutdown();
}
}
看輸出:
由于設(shè)定最大線程數(shù)3,輸出里面最大的線程標(biāo)號(hào)也就是3,符合我們對它的期望。?
SingleThreadExecutor
使用單個(gè)工作線程來執(zhí)行一個(gè)無邊界的隊(duì)列,它適用于保證順序地執(zhí)行多個(gè)任務(wù),并且在任意時(shí)間點(diǎn),都保證只有一個(gè)線程存在。
定義如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
corePoolSize 和 maximumPoolSize 被設(shè)置為 固定數(shù)值1,其他參數(shù)與 FixedThreadPool相同。SingleThreadExecutor 使用有界隊(duì)列 LinkedBlockingQueue 作為線程池的工作隊(duì)列(隊(duì)列的容量為 Integer.MAX_VALUE)。?
舉個(gè)例子看看:
package com.codingfire.cache;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadPoolCase {
static int count = 0;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
exec.execute(new Thr());
Thread.sleep(10);
}
exec.shutdown();
}
static class Thr implements Runnable {
@Override
public void run() {
String name = Thread.currentThread().getName();
for (int i = 0; i < 2; i++) {
count++;
System.out.println(name + ":" + count);
}
}
}
}
查看輸出:
即使創(chuàng)建多個(gè)任務(wù),最終輸出顯示也只有一個(gè)線程在工作。?
CachedThreadPool
這是一個(gè)大小無界的線程池,它根據(jù)需要?jiǎng)?chuàng)建新線程,適用于執(zhí)行短期異步的小任務(wù)或者是負(fù)載較輕的服務(wù)器。
其定義如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
corePoolSize 被設(shè)置為 0,即 核心線程數(shù)為空,maximumPoolSize 被設(shè)置為Integer.MAX_VALUE。這里把 keepAliveTime 設(shè)置為 60L,意味著 CachedThreadPool中的空閑線程等待新任務(wù)的最長時(shí)間為60秒,空閑線程超過60秒后將會(huì)被終止。
? FixedThreadPool 和 SingleThreadExecutor 使用有界隊(duì)列 LinkedBlockingQueue作為線程池的工作隊(duì)列,CachedThreadPool使用沒有容量的SynchronousQueue作為線程池的工作隊(duì)列,但 CachedThreadPool的maximumPool是無界的,也就是說,如果主線程提交任務(wù)的速度高于 maximumPool 中線程處理任務(wù)的速度,CachedThreadPool會(huì)不斷創(chuàng)建新線程,甚至不惜因此耗盡CPU和內(nèi)存,簡直太可怕了,博主個(gè)人覺得還是能不用盡量別用,這就是個(gè)瘋子,瘋子你怕不怕?
舉個(gè)例子:
package com.codingfire.cache;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolCase {
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Thr());
Thread.sleep(10);
}
exec.shutdown();
}
static class Thr implements Runnable {
@Override
public void run() {
String name = Thread.currentThread().getName();
try {
//修改睡眠時(shí)間,模擬線程執(zhí)行需要花費(fèi)的時(shí)間
Thread.sleep(10);
System.out.println(name + "執(zhí)行完了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
直接看執(zhí)行結(jié)果:
執(zhí)行完直接結(jié)束了,是因?yàn)檎{(diào)用了shutdown(),大家嘗試注釋掉這句,等一分鐘,你會(huì)看到程序結(jié)束了:
??
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor,繼承自ThreadPoolExecutor,所以嚴(yán)格意義上算是管黨給我們提供的自定義線程池。它實(shí)現(xiàn)了ScheduledExecutorService接口,就像是提供了“延遲”和“周期執(zhí)行”功能的ThreadPoolExecutor。它可在給定的延遲后運(yùn)行命令或定期執(zhí)行命令,適用于為了滿足資源管理的需求而需要限制后臺(tái)線程數(shù)量的場景,同時(shí)也可以保證多任務(wù)的順序執(zhí)行。
它的構(gòu)造方法比較多:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
它們都是利用ThreadLocalExecutor來構(gòu)造的,唯一不同點(diǎn)在它所使用的阻塞隊(duì)列變成了DelayedWorkQueue?(延時(shí)工作隊(duì)列)。
?DelayedWorkQueue是ScheduledThreadPoolExecutor的內(nèi)部類,類似于延時(shí)隊(duì)列和優(yōu)先級(jí)隊(duì)列,在執(zhí)行定時(shí)任務(wù)的時(shí)候,DelayedWorkQueue讓任務(wù)執(zhí)行時(shí)間的升序來排列,可以保證每次出隊(duì)的任務(wù)都是當(dāng)前隊(duì)列中執(zhí)行時(shí)間最靠前的,這也就是為什么說它可以保證線程執(zhí)行的順序。
舉個(gè)例子:
package com.codingfire.cache;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPool {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
// 第二個(gè)參數(shù)是延遲多久執(zhí)行
scheduledThreadPool.schedule(new Task(), 1, TimeUnit.SECONDS);
scheduledThreadPool.schedule(new Task(), 2, TimeUnit.SECONDS);
scheduledThreadPool.schedule(new Task(), 3, TimeUnit.SECONDS);
Thread.sleep(5000);
// 關(guān)閉線程池
scheduledThreadPool.shutdown();
}
static class Task implements Runnable {
@Override
public void run() {
try {
String name = Thread.currentThread().getName();
System.out.println("線程" + name + ", 開始:" + new Date());
Thread.sleep(1000);
System.out.println("線程" + name + ", 結(jié)束:" + new Date());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
查看輸出:
我們設(shè)置最大線程數(shù)3,然后創(chuàng)建3個(gè)線程,最終輸出里,任務(wù)是按照線程編號(hào)執(zhí)行的,這個(gè)要看結(jié)束的時(shí)間。?
WorkStealingPool
這是JDK1.8中新增的線程池,利用所有運(yùn)行的CPU來創(chuàng)建一個(gè)工作竊取線程池,是對ForkJoinPool的擴(kuò)展,適用于非常耗時(shí)的操作。聽起來都很牛逼啊,在線程池界絕對是爸爸的存在。不好意思,忘了還有個(gè)瘋子線程池呢,地位不保啊。
看代碼:
package com.codingfire.cache;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class WorkStealingPoolDemo {
public static void main(String[] args) throws IOException {
//獲取當(dāng)前可用CPU核數(shù)
System.out.println(Runtime.getRuntime().availableProcessors());
//創(chuàng)建線程池
ExecutorService stealingPool = Executors.newWorkStealingPool();
stealingPool.execute(new Thr(1000));
/**
* 我現(xiàn)在CPU是4個(gè),開啟了5個(gè)線程,第一個(gè)線程一秒執(zhí)行完,其他的都是兩秒
* 此時(shí)會(huì)有一個(gè)線程進(jìn)行等待,當(dāng)?shù)谝粋€(gè)執(zhí)行完畢后,會(huì)偷取第5個(gè)線程執(zhí)行
*/
for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
stealingPool.execute(new Thr(2000));
}
// 因?yàn)閣ork stealing 是deamon線程
// 所以當(dāng)main方法結(jié)束時(shí), 此方法雖然還在后臺(tái)運(yùn)行,但是無輸出
// 可以通過對主線程阻塞解決
System.in.read();
}
static class Thr implements Runnable{
int time;
public Thr(int time) {
this.time = time;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" : "+time);
}
}
}
博主電腦4核,循環(huán)創(chuàng)建4個(gè),循環(huán)外1個(gè),看執(zhí)行結(jié)果:?
??
從執(zhí)行結(jié)果看,worker-1竊取了循環(huán)中最后一個(gè)線程。?第一個(gè)線程一秒執(zhí)行完,其他的都是兩秒,此時(shí)會(huì)有一個(gè)線程等待,當(dāng)?shù)谝粋€(gè)執(zhí)行完畢后,會(huì)偷取第5個(gè)線程執(zhí)行。你要是老板的話,這樣的員工你喜不喜歡?文章來源:http://www.zghlxwxcb.cn/news/detail-460550.html
最終吐槽
博主罵罵咧咧的寫完了,最終5.8w字,還是沒能把握住字?jǐn)?shù),不敢說寫的太詳細(xì),對自己是個(gè)總結(jié),對大家有用的不妨留個(gè)贊,真真寫了好幾天啊,手疼胳膊疼眼睛疼,真沒想到JUC相關(guān)的內(nèi)容這么多,但可能還有遺漏的,算了,就這么著吧,這時(shí)有人會(huì)說:博主你不是挖墳?zāi)??這就不行了?嗯?男人不能說不行,但這篇就先這樣吧,里面涉及到一些詳細(xì)的部分,以后再補(bǔ)吧,也可能會(huì)遙遙無期。兄弟們,不敢再挖了,再挖博主就要改行了。就這樣發(fā)出去大家一起看看討論討論吧。文章來源地址http://www.zghlxwxcb.cn/news/detail-460550.html
到了這里,關(guān)于Java開發(fā) - 不知道算不算詳細(xì)的JUC詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!