- ??作者簡介:大家好,我是愛敲代碼的小黃,獨(dú)角獸企業(yè)的Java開發(fā)工程師,CSDN博客專家,阿里云專家博主
- ??系列專欄:Java設(shè)計(jì)模式、數(shù)據(jù)結(jié)構(gòu)和算法、Kafka從入門到成神、Kafka從成神到升仙、Spring從成神到升仙系列
- ??如果感覺博主的文章還不錯(cuò)的話,請??三連支持??一下博主哦
- ??博主正在努力完成2023計(jì)劃中:以夢為馬,揚(yáng)帆起航,2023追夢人
- ??聯(lián)系方式:hls1793929520,加我進(jìn)群,大家一起學(xué)習(xí),一起進(jìn)步,一起對抗互聯(lián)網(wǎng)寒冬??
從源碼全面解析 ArrayBlockingQueue 的來龍去脈
一、引言
并發(fā)編程在互聯(lián)網(wǎng)技術(shù)使用如此廣泛,幾乎所有的后端技術(shù)面試官都要在并發(fā)編程的使用和原理方面對小伙伴們進(jìn)行 360° 的刁難。
作為一個(gè)在互聯(lián)網(wǎng)公司面一次拿一次 Offer
的面霸,打敗了無數(shù)競爭對手,每次都只能看到無數(shù)落寞的身影失望的離開,略感愧疚(請?jiān)试S我使用一下夸張的修辭手法)。
于是在一個(gè)寂寞難耐的夜晚,暖男我痛定思痛,決定開始寫 《吊打面試官》 系列,希望能幫助各位讀者以后面試勢如破竹,對面試官進(jìn)行 360° 的反擊,吊打問你的面試官,讓一同面試的同僚瞠目結(jié)舌,瘋狂收割大廠 Offer
!
雖然現(xiàn)在是互聯(lián)網(wǎng)寒冬,但乾坤未定,你我皆是黑馬
二、使用
對于阻塞隊(duì)列,想必大家應(yīng)該都不陌生,我們這里簡單的介紹一下,對于 Java
里面的阻塞隊(duì)列,其使用了 **生產(chǎn)者和消費(fèi)者 **的模型
對于生產(chǎn)者來說,主要有以下幾部分:
add(E) // 添加數(shù)據(jù)到隊(duì)列,如果隊(duì)列滿了,無法存儲(chǔ),拋出異常
offer(E) // 添加數(shù)據(jù)到隊(duì)列,如果隊(duì)列滿了,返回false
offer(E,timeout,unit) // 添加數(shù)據(jù)到隊(duì)列,如果隊(duì)列滿了,阻塞timeout時(shí)間,如果阻塞一段時(shí)間,依然沒添加進(jìn)入,返回false
put(E) // 添加數(shù)據(jù)到隊(duì)列,如果隊(duì)列滿了,掛起線程,等到隊(duì)列中有位置,再扔數(shù)據(jù)進(jìn)去,死等!
對于消費(fèi)者來說,主要有以下幾部分:
remove() // 從隊(duì)列中移除數(shù)據(jù),如果隊(duì)列為空,拋出異常
poll() // 從隊(duì)列中移除數(shù)據(jù),如果隊(duì)列為空,返回null,么的數(shù)據(jù)
poll(timeout,unit) // 從隊(duì)列中移除數(shù)據(jù),如果隊(duì)列為空,掛起線程timeout時(shí)間,等生產(chǎn)者扔數(shù)據(jù),再獲取
take() // 從隊(duì)列中移除數(shù)據(jù),如果隊(duì)列為空,線程掛起,一直等到生產(chǎn)者扔數(shù)據(jù),再獲取
我們本篇來講講堵塞隊(duì)列中的第一員猛將,ArrayBlockingQueue
的故事
我們簡單來寫一個(gè)小demo
public class ArrayBlockingQueueTest {
public static void main(String[] args) throws Exception {
// 必須設(shè)置隊(duì)列的長度
ArrayBlockingQueue queue = new ArrayBlockingQueue(4);
// 生產(chǎn)者扔數(shù)據(jù)
queue.add("1");
queue.offer("2");
queue.offer("3", 2, TimeUnit.SECONDS);
queue.put("2");
// 消費(fèi)者取數(shù)據(jù)
System.out.println(queue.remove());
System.out.println(queue.poll());
System.out.println(queue.poll(2, TimeUnit.SECONDS));
System.out.println(queue.take());
}
}
三、源碼
1、初始化
由于我們的 ArrayBlockingQueue
底層使用的是數(shù)據(jù)結(jié)構(gòu),所以我們需要在初始化的時(shí)候指定其大小,如下:
// 設(shè)置其大小長度為 4
ArrayBlockingQueue queue = new ArrayBlockingQueue(4);
// 初始化
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 初始化ArrayBlockingQueue的一些初始變量
public ArrayBlockingQueue(int capacity, boolean fair) {
// 如果傳一個(gè)負(fù)數(shù),直接完蛋
if (capacity <= 0)
throw new IllegalArgumentException();
// 初始化數(shù)組items
this.items = new Object[capacity];
// 初始化lock非公平鎖
lock = new ReentrantLock(fair);
// 消費(fèi)者掛起線程和喚醒線程用到的Condition
notEmpty = lock.newCondition();
// 生產(chǎn)者掛起線程和喚醒線程用到的Condition
notFull = lock.newCondition();
}
除了我們初始化的這些變量,也有其他的一些變量:
// 存儲(chǔ)數(shù)據(jù)的下標(biāo)
int putIndex
// 取數(shù)據(jù)的下標(biāo)
int takeIndex
// 當(dāng)前數(shù)組中存儲(chǔ)的數(shù)據(jù)長度
int count
對于 ReentrantLock
和 newCondition
的知識(shí)點(diǎn),可以看以下博文:
- newCondition的源碼分析
- ReentrantLock的源碼分析
2、生產(chǎn)者的源碼
2.1 add()源碼實(shí)現(xiàn)
public boolean add(E e) {
return super.add(e);
}
// 走到這里會(huì)發(fā)現(xiàn),我們的add方法就是調(diào)用了offer方法
// offer: 添加數(shù)據(jù)到隊(duì)列,如果隊(duì)列滿了,返回false
// 所以這里offer滿了,就會(huì)拋出異常:"Queue full"
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
2.2 offer()源碼實(shí)現(xiàn)
public boolean offer(E e) {
// 檢測當(dāng)前的入?yún)⑹欠駷閚ull
// 如果是的話直接拋出異常
checkNotNull(e);
//【面試絕殺招】為什么會(huì)這樣引用使用?
final ReentrantLock lock = this.lock;
// 直接加鎖,保證線程安全
lock.lock();
try {
// 如果當(dāng)前數(shù)組存儲(chǔ)的長度等于總?cè)萘?/span>
// 直接返回false,插入失敗
if (count == items.length)
return false;
else {
// 插入
enqueue(e);
return true;
}
} finally {
// 結(jié)束之后將鎖釋放掉
lock.unlock();
}
}
// 添加數(shù)據(jù)
private void enqueue(E x) {
// 我們發(fā)現(xiàn),這引用又來了
final Object[] items = this.items;
// 當(dāng)前數(shù)組的賦值
items[putIndex] = x;
// 如果下標(biāo)等于我們的總?cè)萘浚枰匦轮孟聵?biāo)值為0
if (++putIndex == items.length)
putIndex = 0;
// 數(shù)組容量加一
count++;
// 喚醒消費(fèi)者等待的線程
notEmpty.signal();
}
2.3 offer(time,unit)源碼實(shí)現(xiàn)
生產(chǎn)者在添加數(shù)據(jù)時(shí),如果隊(duì)列已經(jīng)滿了,阻塞一會(huì)
- 阻塞到消費(fèi)者消費(fèi)了消息,然后喚醒當(dāng)前阻塞線程
- 阻塞到了
time
時(shí)間,再次判斷是否可以添加,不能,直接告辭
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {
// 檢測當(dāng)前的入?yún)⑹欠駷榭?/span>
checkNotNull(e);
// 統(tǒng)一時(shí)間格式
long nanos = unit.toNanos(timeout);
// 持有引用
final ReentrantLock lock = this.lock;
// 允許的中斷的加鎖方式
lock.lockInterruptibly();
try {
// 如果當(dāng)前的存儲(chǔ)等于數(shù)組的長度
// 這里為什么不能用if判斷,需要用while,牽扯到虛假喚醒,我們后面聊
while (count == items.length) {
// 時(shí)間小于0,直接返回false
if (nanos <= 0)
return false;
// 掛起當(dāng)前線程
nanos = notFull.awaitNanos(nanos);
}
// 添加到數(shù)組中
enqueue(e);
return true;
} finally {
// 解鎖
lock.unlock();
}
}
2.4 put()源碼實(shí)現(xiàn)
public void put(E e) throws InterruptedException {
// 檢測當(dāng)前的入?yún)⑹欠駷榭?/span>
checkNotNull(e);
// 持有引用
final ReentrantLock lock = this.lock;
// 允許的中斷的加鎖方式
lock.lockInterruptibly();
try {
// 如果當(dāng)前的存儲(chǔ)等于數(shù)組的長度
// 這里為什么不能用if判斷,需要用while,牽扯到虛假喚醒,我們后面聊
while (count == items.length)
// 無時(shí)間掛起當(dāng)前線程
notFull.await();
// 添加到隊(duì)列
enqueue(e);
} finally {
// 解鎖
lock.unlock();
}
}
通過上面的源碼分析,我們應(yīng)該可以理解上面說的這幾句話了
add(E) // 添加數(shù)據(jù)到隊(duì)列,如果隊(duì)列滿了,無法存儲(chǔ),拋出異常
offer(E) // 添加數(shù)據(jù)到隊(duì)列,如果隊(duì)列滿了,返回false
offer(E,timeout,unit) // 添加數(shù)據(jù)到隊(duì)列,如果隊(duì)列滿了,阻塞timeout時(shí)間,如果阻塞一段時(shí)間,依然沒添加進(jìn)入,返回false
put(E) // 添加數(shù)據(jù)到隊(duì)列,如果隊(duì)列滿了,掛起線程,等到隊(duì)列中有位置,再扔數(shù)據(jù)進(jìn)去,死等!
接著我們講兩個(gè)小細(xì)節(jié),也是面試震驚面試官的地方
2.5 final ReentrantLock lock = this.lock
在我們 Doug Lea
里寫的代碼中,java.util.concurrent
包下 和 HashMap
中都有類似的寫法
這種寫法到底有什么好處呢,為什么我們不能直接使用成員變量 lock
來進(jìn)行加鎖解鎖
感興趣的可以看下這篇博文:原因
不感興趣的可以看我的總結(jié)分析:
首先我們需要準(zhǔn)備下面兩個(gè)代碼,將其反編譯得到 Java
字節(jié)碼
-
引用狀態(tài)
public void get1(){ final ReentrantLock lock = this.lock; lock.lock(); }
-
非引用狀態(tài)
public void get2(){ lock.lock(); }
通過對比字節(jié)碼我們發(fā)現(xiàn),引用狀態(tài)的字節(jié)碼相較于非引用狀態(tài)少了一個(gè)指令:getfield
而這個(gè)缺少的指令,也正是 Doug Lea
優(yōu)化的來源:從棧讀變量比從堆讀變量會(huì)更cache-friendly
,本地變量最終綁定到CPU寄存器的可能性更高。
但由于現(xiàn)在的 Java
編譯器已經(jīng)非常先進(jìn)了,不論采用哪種方式,最終形成的機(jī)器指令都是一樣的
所以,Doug Lea
的優(yōu)化在之前是字節(jié)碼層面的優(yōu)化,但如今確實(shí)沒有卵用
2.6 虛假喚醒
我們上面有一段 while
循環(huán)的代碼:
// 如果當(dāng)前的存儲(chǔ)等于數(shù)組的長度
// 這里為什么不能用if判斷,需要用while,牽扯到虛假喚醒,我們后面聊
while (count == items.length) {
// 無時(shí)間掛起當(dāng)前線程
notFull.await();
}
// 添加到隊(duì)列
enqueue(e);
我們 A
線程判斷數(shù)組內(nèi)還有空余,則放入數(shù)組
我們 B
線程判斷其 count == items.length
進(jìn)入掛起狀態(tài),當(dāng)我們的 B
線程被喚醒時(shí),如果不經(jīng)歷 count == items.length
的過程,就會(huì)將我們 A
線程的 3
數(shù)據(jù)給覆蓋掉
3、消費(fèi)者的源碼
3.1 remove()源碼實(shí)現(xiàn)
- 主要使用了我們的poll方法
public E remove() {
// 直接調(diào)用poll方法
E x = poll();
// 如果有數(shù)據(jù)則返回
// 無數(shù)據(jù)則拋出異常
if (x != null)
return x;
else
throw new NoSuchElementException();
}
3.2 poll() 源碼實(shí)現(xiàn)
public E poll() {
// 還是引用
final ReentrantLock lock = this.lock;
// 鎖一下
lock.lock();
try {
// 判斷數(shù)組容量是否等于0(數(shù)組無容量),返回null
// 如果數(shù)組中有數(shù)據(jù),則進(jìn)行dequeue方法
return (count == 0) ? null : dequeue();
} finally {
// 解鎖
lock.unlock();
}
}
private E dequeue() {
// 還是引用
final Object[] items = this.items;
// 當(dāng)前彈出數(shù)組的下標(biāo)
E x = (E) items[takeIndex];
// 彈出后將當(dāng)前下標(biāo)的數(shù)據(jù)置為空
items[takeIndex] = null;
// 如果我們的彈出下標(biāo)和我們數(shù)組的大小一樣時(shí),需要更新彈出下標(biāo)
if (++takeIndex == items.length)
takeIndex = 0;
// 數(shù)組數(shù)據(jù)數(shù)量減一
count--;
// 迭代器內(nèi)容,先忽略
if (itrs != null)
itrs.elementDequeued();
// 喚醒生產(chǎn)者的線程
notFull.signal();
return x;
}
3.3 poll(time,unit)源碼實(shí)現(xiàn)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 將時(shí)間轉(zhuǎn)化成統(tǒng)一單位
long nanos = unit.toNanos(timeout);
// 引用
final ReentrantLock lock = this.lock;
// 可中斷的加鎖
lock.lockInterruptibly();
try {
// 看一下當(dāng)前的數(shù)組還有容量沒
while (count == 0) {
// 如果沒有容量并且時(shí)間也到期了,返回null
if (nanos <= 0)
return null;
// 進(jìn)入帶有時(shí)間的等待狀態(tài)(扔到Condition隊(duì)列中)
nanos = notEmpty.awaitNanos(nanos);
}
// 被喚醒后并且當(dāng)前的數(shù)組有容量
// 彈出隊(duì)列中的數(shù)據(jù)即可
return dequeue();
} finally {
// 解鎖
lock.unlock();
}
}
3.4 take()源碼實(shí)現(xiàn)
public E take() throws InterruptedException {
// 引用
final ReentrantLock lock = this.lock;
// 可中斷的加鎖
lock.lockInterruptibly();
try {
// 看一下當(dāng)前的數(shù)組還有容量沒
while (count == 0){
// 沒容量直接扔Condition隊(duì)列等待
notEmpty.await();
}
// 被喚醒后并且當(dāng)前的數(shù)組有容量
// 彈出隊(duì)列中的數(shù)據(jù)即可
return dequeue();
} finally {
// 解鎖
lock.unlock();
}
}
四、流程圖
私聊我獲取高清流程圖
五、總結(jié)
魯迅先生曾說:獨(dú)行難,眾行易,和志同道合的人一起進(jìn)步。彼此毫無保留的分享經(jīng)驗(yàn),才是對抗互聯(lián)網(wǎng)寒冬的最佳選擇。
其實(shí)很多時(shí)候,并不是我們不夠努力,很可能就是自己努力的方向不對,如果有一個(gè)人能稍微指點(diǎn)你一下,你真的可能會(huì)少走幾年彎路。
如果你也對 后端架構(gòu)和中間件源碼 有興趣,歡迎添加博主微信:hls1793929520,一起學(xué)習(xí),一起成長
我是愛敲代碼的小黃,獨(dú)角獸企業(yè)的Java開發(fā)工程師,CSDN博客專家,喜歡后端架構(gòu)和中間件源碼。
我們下期再見。
我從清晨走過,也擁抱夜晚的星辰,人生沒有捷徑,你我皆平凡,你好,陌生人,一起共勉。文章來源:http://www.zghlxwxcb.cn/news/detail-454668.html
往期文章推薦:文章來源地址http://www.zghlxwxcb.cn/news/detail-454668.html
- 從根上剖析ReentrantLock的來龍去脈
- 閱讀完synchronized和ReentrantLock的源碼后,我竟發(fā)現(xiàn)其完全相似
- 從源碼全面解析 ThreadLocal 關(guān)鍵字的來龍去脈
- 從源碼全面解析 synchronized 關(guān)鍵字的來龍去脈
- 阿里面試官讓我講講volatile,我直接從HotSpot開始講起,一套組合拳拿下面試
到了這里,關(guān)于從源碼全面解析 ArrayBlockingQueue 的來龍去脈的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!