四、阻塞隊列
一、基礎(chǔ)概念
1.1 生產(chǎn)者消費者概念
生產(chǎn)者消費者是設(shè)計模式的一種。讓生產(chǎn)者和消費者基于一個容器來解決強耦合問題。
生產(chǎn)者 消費者彼此之間不會直接通訊的,而是通過一個容器(隊列)進行通訊。
所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)后扔到容器中,不通用等待消費者來處理。
消費者不需要去找生產(chǎn)者要數(shù)據(jù),直接從容器中獲取即可。
而這種容器最常用的結(jié)構(gòu)就是隊列。
1.2 JUC阻塞隊列的存取方法
常用的存取方法都是來自于JUC包下的BlockingQueue
生產(chǎn)者存儲方法
add(E) // 添加數(shù)據(jù)到隊列,如果隊列滿了,無法存儲,拋出異常
offer(E) // 添加數(shù)據(jù)到隊列,如果隊列滿了,返回false
offer(E,timeout,unit) // 添加數(shù)據(jù)到隊列,如果隊列滿了,阻塞timeout時間,如果阻塞一段時間,依然沒添加進入,返回false
put(E) // 添加數(shù)據(jù)到隊列,如果隊列滿了,掛起線程,等到隊列中有位置,再扔數(shù)據(jù)進去,死等!
消費者取數(shù)據(jù)方法
remove() // 從隊列中移除數(shù)據(jù),如果隊列為空,拋出異常
poll() // 從隊列中移除數(shù)據(jù),如果隊列為空,返回null,么的數(shù)據(jù)
poll(timeout,unit) // 從隊列中移除數(shù)據(jù),如果隊列為空,掛起線程timeout時間,等生產(chǎn)者扔數(shù)據(jù),再獲取
take() // 從隊列中移除數(shù)據(jù),如果隊列為空,線程掛起,一直等到生產(chǎn)者扔數(shù)據(jù),再獲取
二、ArrayBlockingQueue
2.1 ArrayBlockingQueue的基本使用
ArrayBlockingQueue在初始化的時候,必須指定當前隊列的長度。
因為ArrayBlockingQueue是基于數(shù)組實現(xiàn)的隊列結(jié)構(gòu),數(shù)組長度不可變,必須提前設(shè)置數(shù)組長度信息。
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
// 必須設(shè)置隊列的長度
ArrayBlockingQueue queue = new ArrayBlockingQueue(4);
// 生產(chǎn)者扔數(shù)據(jù)
queue.add("1");
queue.offer("2");
queue.offer("3",2,TimeUnit.SECONDS);
queue.put("2");
// 消費者取數(shù)據(jù)
System.out.println(queue.remove());
System.out.println(queue.poll());
System.out.println(queue.poll(2,TimeUnit.SECONDS));
System.out.println(queue.take());
}
2.2 生產(chǎn)者方法實現(xiàn)原理
生產(chǎn)者添加數(shù)據(jù)到隊列的方法比較多,需要一個一個查看
2.2.1 ArrayBlockingQueue的常見屬性
ArrayBlockingQueue中的成員變量
lock = 就是一個ReentrantLock
count = 就是當前數(shù)組中元素的個數(shù)
iterms = 就是數(shù)組本身
# 基于putIndex和takeIndex將數(shù)組結(jié)構(gòu)實現(xiàn)為了隊列結(jié)構(gòu)
putIndex = 存儲數(shù)據(jù)時的下標
takeIndex = 去數(shù)據(jù)時的下標
notEmpty = 消費者掛起線程和喚醒線程用到的Condition(看成sync的wait和notify)
notFull = 生產(chǎn)者掛起線程和喚醒線程用到的Condition(看成sync的wait和notify)
2.2.2 add方法實現(xiàn)
add方法本身就是調(diào)用了offer方法,如果offer方法返回false,直接拋出異常
public boolean add(E e) {
if (offer(e))
return true;
else
// 拋出的異常
throw new IllegalStateException("Queue full");
}
2.2.3 offer方法實現(xiàn)
public boolean offer(E e) {
// 要求存儲的數(shù)據(jù)不允許為null,為null就拋出空指針
checkNotNull(e);
// 當前阻塞隊列的lock鎖
final ReentrantLock lock = this.lock;
// 為了保證線程安全,加鎖
lock.lock();
try {
// 如果隊列中的元素已經(jīng)存滿了,
if (count == items.length)
// 返回false
return false;
else {
// 隊列沒滿,執(zhí)行enqueue將元素添加到隊列中
enqueue(e);
// 返回true
return true;
}
} finally {
// 操作完釋放鎖
lock.unlock();
}
}
//==========================================================
private void enqueue(E x) {
// 拿到數(shù)組的引用
final Object[] items = this.items;
// 將元素放到指定位置
items[putIndex] = x;
// 對inputIndex進行++操作,并且判斷是否已經(jīng)等于數(shù)組長度,需要歸位
if (++putIndex == items.length)
// 將索引設(shè)置為0
putIndex = 0;
// 元素添加成功,進行++操作。
count++;
// 將一個Condition中阻塞的線程喚醒。
notEmpty.signal();
}
2.2.4 offer(time,unit)方法
生產(chǎn)者在添加數(shù)據(jù)時,如果隊列已經(jīng)滿了,阻塞一會。
- 阻塞到消費者消費了消息,然后喚醒當前阻塞線程
- 阻塞到了time時間,再次判斷是否可以添加,不能,直接告辭。
// 如果線程在掛起的時候,如果對當前阻塞線程的中斷標記位進行設(shè)置,此時會拋出異常直接結(jié)束
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 非空檢驗
checkNotNull(e);
// 將時間單位轉(zhuǎn)換為納秒
long nanos = unit.toNanos(timeout);
// 加鎖
final ReentrantLock lock = this.lock;
// 允許線程中斷并排除異常的加鎖方式
lock.lockInterruptibly();
try {
// 為什么是while(虛假喚醒)
// 如果元素個數(shù)和數(shù)組長度一致,隊列慢了
while (count == items.length) {
// 判斷等待的時間是否還充裕
if (nanos <= 0)
// 不充裕,直接添加失敗
return false;
// 掛起等待,會同時釋放鎖資源(對標sync的wait方法)
// awaitNanos會掛起線程,并且返回剩余的阻塞時間
// 恢復執(zhí)行時,需要重新獲取鎖資源
nanos = notFull.awaitNanos(nanos);
}
// 說明隊列有空間了,enqueue將數(shù)據(jù)扔到阻塞隊列中
enqueue(e);
return true;
} finally {
// 釋放鎖資源
lock.unlock();
}
}
2.2.5 put方法
如果隊列是滿的, 就一直掛起,直到被喚醒,或者被中斷
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
// await方法一直阻塞,直到被喚醒或者中斷標記位
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
2.3 消費者方法實現(xiàn)原理
2.3.1 remove方法
// remove方法就是調(diào)用了poll
public E remove() {
E x = poll();
// 如果有數(shù)據(jù),直接返回
if (x != null)
return x;
// 沒數(shù)據(jù)拋出異常
else
throw new NoSuchElementException();
}
2.4.2 poll方法
// 拉取數(shù)據(jù)
public E poll() {
// 加鎖操作
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果沒有數(shù)據(jù),直接返回null,如果有數(shù)據(jù),執(zhí)行dequeue,取出數(shù)據(jù)并返回
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//==========================================================
// 取出數(shù)據(jù)
private E dequeue() {
// 將成員變量引用到局部變量
final Object[] items = this.items;
// 直接獲取指定索引位置的數(shù)據(jù)
E x = (E) items[takeIndex];
// 將數(shù)組上指定索引位置設(shè)置為null
items[takeIndex] = null;
// 設(shè)置下次取數(shù)據(jù)時的索引位置
if (++takeIndex == items.length)
takeIndex = 0;
// 對count進行--操作
count--;
// 迭代器內(nèi)容,先跳過
if (itrs != null)
itrs.elementDequeued();
// signal方法,會喚醒當前Condition中排隊的一個Node。
// signalAll方法,會將Condition中所有的Node,全都喚醒
notFull.signal();
// 返回數(shù)據(jù)。
return x;
}
2.4.3 poll(time,unit)方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 轉(zhuǎn)換時間單位
long nanos = unit.toNanos(timeout);
// 競爭鎖
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果沒有數(shù)據(jù)
while (count == 0) {
if (nanos <= 0)
// 沒數(shù)據(jù),也無法阻塞了,返回null
return null;
// 沒數(shù)據(jù),掛起消費者線程
nanos = notEmpty.awaitNanos(nanos);
}
// 取數(shù)據(jù)
return dequeue();
} finally {
lock.unlock();
}
}
2.4.4 take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 虛假喚醒
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
2.4.5 虛假喚醒
阻塞隊列中,如果需要線程掛起操作,判斷有無數(shù)據(jù)的位置采用的是while循環(huán) ,為什么不能換成if
肯定是不能換成if邏輯判斷
線程A,線程B,線程E,線程C。 其中ABE生產(chǎn)者,C屬于消費者
假如線程的隊列是滿的
// E,拿到鎖資源,還沒有走while判斷
while (count == items.length)
// A醒了
// B掛起
notFull.await();
enqueue(e);
C此時消費一條數(shù)據(jù),執(zhí)行notFull.signal()喚醒一個線程,A線程被喚醒
E走判斷,發(fā)現(xiàn)有空余位置,可以添加數(shù)據(jù)到隊列,E添加數(shù)據(jù),走enqueue
如果判斷是if,A在E釋放鎖資源后,拿到鎖資源,直接走enqueue方法。
此時A線程就是在putIndex的位置,覆蓋掉之前的數(shù)據(jù),造成數(shù)據(jù)安全問題
三、LinkedBlockingQueue
3.1 LinkedBlockingQueue的底層實現(xiàn)
查看LinkedBlockingQueue是如何存儲數(shù)據(jù),并且實現(xiàn)鏈表結(jié)構(gòu)的。
// Node對象就是存儲數(shù)據(jù)的單位
static class Node<E> {
// 存儲的數(shù)據(jù)
E item;
// 指向下一個數(shù)據(jù)的指針
Node<E> next;
// 有參構(gòu)造
Node(E x) { item = x; }
}
查看LinkedBlockingQueue的有參構(gòu)造
// 可以手動指定LinkedBlockingQueue的長度,如果沒有指定,默認為Integer.MAX_VALUE
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 在初始化時,構(gòu)建一個item為null的節(jié)點,作為head和last
// 這種node可以成為哨兵Node,
// 如果沒有哨兵節(jié)點,那么在獲取數(shù)據(jù)時,需要判斷head是否為null,才能找next
// 如果沒有哨兵節(jié)點,那么在添加數(shù)據(jù)時,需要判斷l(xiāng)ast是否為null,才能找next
last = head = new Node<E>(null);
}
查看LinkedBlockingQueue的其他屬性
// 因為是鏈表,沒有想數(shù)組的length屬性,基于AtomicInteger來記錄長度
private final AtomicInteger count = new AtomicInteger();
// 鏈表的頭,取
transient Node<E> head;
// 鏈表的尾,存
private transient Node<E> last;
// 消費者的鎖
private final ReentrantLock takeLock = new ReentrantLock();
// 消費者的掛起操作,以及喚醒用的condition
private final Condition notEmpty = takeLock.newCondition();
// 生產(chǎn)者的鎖
private final ReentrantLock putLock = new ReentrantLock();
// 生產(chǎn)者的掛起操作,以及喚醒用的condition
private final Condition notFull = putLock.newCondition();
3.2 生產(chǎn)者方法實現(xiàn)原理
3.2.1 add方法
你懂得,還是走offer方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
3.2.2 offer方法
public boolean offer(E e) {
// 非空校驗
if (e == null) throw new NullPointerException();
// 拿到存儲數(shù)據(jù)條數(shù)的count
final AtomicInteger count = this.count;
// 查看當前數(shù)據(jù)條數(shù),是否等于隊列限制長度,達到了這個長度,直接返回false
if (count.get() == capacity)
return false;
// 聲明c,作為標記存在
int c = -1;
// 將存儲的數(shù)據(jù)封裝為Node對象
Node<E> node = new Node<E>(e);
// 獲取生產(chǎn)者的鎖。
final ReentrantLock putLock = this.putLock;
// 競爭鎖資源
putLock.lock();
try {
// 再次做一個判斷,查看是否還有空間
if (count.get() < capacity) {
// enqueue,扔數(shù)據(jù)
enqueue(node);
// 將數(shù)據(jù)個數(shù) + 1
c = count.getAndIncrement();
// 拿到count的值 小于 長度限制
// 有生產(chǎn)者在基于await掛起,這里添加完數(shù)據(jù)后,發(fā)現(xiàn)還有空間可以存儲數(shù)據(jù),
// 喚醒前面可能已經(jīng)掛起的生產(chǎn)者
// 因為這里生產(chǎn)者和消費者不是互斥的,寫操作進行的同時,可能也有消費者在消費數(shù)據(jù)。
if (c + 1 < capacity)
// 喚醒生產(chǎn)者
notFull.signal();
}
} finally {
// 釋放鎖資源
putLock.unlock();
}
// 如果c == 0,代表添加數(shù)據(jù)之前,隊列元素個數(shù)是0個。
// 如果有消費者在隊列沒有數(shù)據(jù)的時候,來消費,此時消費者一定會掛起線程
if (c == 0)
// 喚醒消費者
signalNotEmpty();
// 添加成功返回true,失敗返回-1
return c >= 0;
}
//================================================
private void enqueue(Node<E> node) {
// 將當前Node設(shè)置為last的next,并且再將當前Node作為last
last = last.next = node;
}
//================================================
private void signalNotEmpty() {
// 獲取讀鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 喚醒。
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
sync -> wait / notify
3.2.3 offer(time,unit)方法
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 非空檢驗
if (e == null) throw new NullPointerException();
// 將時間轉(zhuǎn)換為納秒
long nanos = unit.toNanos(timeout);
// 標記
int c = -1;
// 寫鎖,數(shù)據(jù)條數(shù)
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 允許中斷的加鎖方式
putLock.lockInterruptibly();
try {
// 如果元素個數(shù)和限制個數(shù)一致,直接準備掛起
while (count.get() == capacity) {
// 掛起的時間是不是已經(jīng)沒了
if (nanos <= 0)
// 添加失敗,返回false
return false;
// 掛起線程
nanos = notFull.awaitNanos(nanos);
}
// 有空余位置,enqueue添加數(shù)據(jù)
enqueue(new Node<E>(e));
// 元素個數(shù) + 1
c = count.getAndIncrement();
// 當前添加完數(shù)據(jù),還有位置可以添加數(shù)據(jù),喚醒可能阻塞的生產(chǎn)者
if (c + 1 < capacity)
notFull.signal();
} finally {
// 釋放鎖
putLock.unlock();
}
// 如果之前元素個數(shù)是0,喚醒可能等待的消費者
if (c == 0)
signalNotEmpty();
return true;
}
3.2.4 put方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// 一直掛起線程,等待被喚醒
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
3.3 消費者方法實現(xiàn)原理
從remove方法開始,查看消費者獲取數(shù)據(jù)的方式
3.3.1 remove方法
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
3.3.2 poll方法
public E poll() {
// 拿到隊列數(shù)據(jù)個數(shù)的計數(shù)器
final AtomicInteger count = this.count;
// 當前隊列中數(shù)據(jù)是否0
if (count.get() == 0)
// 說明隊列沒數(shù)據(jù),直接返回null即可
return null;
// 聲明返回結(jié)果
E x = null;
// 標記
int c = -1;
// 獲取消費者的takeLock
final ReentrantLock takeLock = this.takeLock;
// 加鎖
takeLock.lock();
try {
// 基于DCL,確保當前隊列中依然有元素
if (count.get() > 0) {
// 從隊列中移除數(shù)據(jù)
x = dequeue();
// 將之前的元素個數(shù)獲取,并--
c = count.getAndDecrement();
if (c > 1)
// 如果依然有數(shù)據(jù),繼續(xù)喚醒await的消費者。
notEmpty.signal();
}
} finally {
// 釋放鎖資源
takeLock.unlock();
}
// 如果之前的元素個數(shù)為當前隊列的限制長度,
// 現(xiàn)在消費者消費了一個數(shù)據(jù),多了一個空位可以添加
if (c == capacity)
// 喚醒阻塞的生產(chǎn)者
signalNotFull();
return x;
}
//================================================
private E dequeue() {
// 拿到隊列的head位置數(shù)據(jù)
Node<E> h = head;
// 拿到了head的next,因為這個是哨兵Node,需要拿到的head.next的數(shù)據(jù)
Node<E> first = h.next;
// 將之前的哨兵Node.next置位null。help GC。
h.next = h;
// 將first置位新的head
head = first;
// 拿到返回結(jié)果first節(jié)點的item數(shù)據(jù),也就是之前head.next.item
E x = first.item;
// 將first數(shù)據(jù)置位null,作為新的head
first.item = null;
// 返回數(shù)據(jù)
return x;
}
//================================================
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 喚醒生產(chǎn)者。
notFull.signal();
} finally {
putLock.unlock();
}
}
3.3.3 poll(time,unit)方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 返回結(jié)果
E x = null;
// 標識
int c = -1;
// 將掛起實現(xiàn)設(shè)置為納秒級別
long nanos = unit.toNanos(timeout);
// 拿到計數(shù)器
final AtomicInteger count = this.count;
// take鎖加鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果沒數(shù)據(jù),進到while
while (count.get() == 0) {
if (nanos <= 0)
return null;
// 掛起當前線程
nanos = notEmpty.awaitNanos(nanos);
}
// 剩下內(nèi)容,和之前一樣。
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
3.3.4 take方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 相比poll(time,unit)方法,這里的出口只有一個,就是中斷標記位,拋出異常,否則一直等待
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
四、PriorityBlockingQueue概念
4.1 PriorityBlockingQueue介紹
首先PriorityBlockingQueue是一個優(yōu)先級隊列,他不滿足先進先出的概念。
會將查詢的數(shù)據(jù)進行排序,排序的方式就是基于插入數(shù)據(jù)值的本身。
如果是自定義對象必須要實現(xiàn)Comparable接口才可以添加到優(yōu)先級隊列
排序的方式是基于二叉堆實現(xiàn)的。底層是采用數(shù)據(jù)結(jié)構(gòu)實現(xiàn)的二叉堆。
4.2 二叉堆結(jié)構(gòu)介紹
優(yōu)先級隊列PriorityBlockingQueue基于二叉堆實現(xiàn)的。
private transient Object[] queue;
PriorityBlockingQueue是基于數(shù)組實現(xiàn)的二叉堆。
二叉堆是什么?
- 二叉堆就是一個完整的二叉樹。
- 任意一個節(jié)點大于父節(jié)點或者小于父節(jié)點
- 基于同步的方式,可以定義出小頂堆和大頂堆
小頂堆以及小頂堆基于數(shù)據(jù)實現(xiàn)的方式。
4.3 PriorityBlockingQueue核心屬性
// 數(shù)組的初始長度
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 數(shù)組的最大長度
// -8的目的是為了適配各個版本的虛擬機
// 默認當前使用的hotspot虛擬機最大支持Integer.MAX_VALUE - 2,但是其他版本的虛擬機不一定。
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 存儲數(shù)據(jù)的數(shù)組,也是基于這個數(shù)組實現(xiàn)的二叉堆。
private transient Object[] queue;
// size記錄當前阻塞隊列中元素的個數(shù)
private transient int size;
// 要求使用的對象要實現(xiàn)Comparable比較器?;赾omparator做對象之間的比較
private transient Comparator<? super E> comparator;
// 實現(xiàn)阻塞隊列的lock鎖
private final ReentrantLock lock;
// 掛起線程操作。
private final Condition notEmpty;
// 因為PriorityBlockingQueue的底層是基于二叉堆的,而二叉堆又是基于數(shù)組實現(xiàn)的,數(shù)組長度是固定的,如果需要擴容,需要構(gòu)建一個新數(shù)組。PriorityBlockingQueue在做擴容操作時,不會lock住的,釋放lock鎖,基于allocationSpinLock屬性做標記,來避免出現(xiàn)并發(fā)擴容的問題。
private transient volatile int allocationSpinLock;
// 阻塞隊列中用到的原理,其實就是普通的優(yōu)先級隊列。
private PriorityQueue<E> q;
4.4 PriorityBlockingQueue的寫入操作
畢竟是阻塞隊列,添加數(shù)據(jù)的操作,咱們是很了解,還是add,offer,offer(time,unit),put。但是因為優(yōu)先級隊列中,數(shù)組是可以擴容的,雖然有長度限制,但是依然屬于無界隊列的概念,所以生產(chǎn)者不會阻塞,所以只有offer方法可以查看。
這次核心的內(nèi)容并不是添加數(shù)據(jù)的區(qū)別。主要關(guān)注的是如何保證二叉堆中小頂堆的結(jié)構(gòu)的,并且還要查看數(shù)組擴容的一個過程是怎樣的。
4.4.1 offer基本流程
因為add方法依然調(diào)用的是offer方法,直接查看offer方法即可
public boolean offer(E e) {
// 非空判斷。
if (e == null)
throw new NullPointerException();
// 拿到鎖,直接上鎖
final ReentrantLock lock = this.lock;
lock.lock();
// n:size,元素的個數(shù)
// cap:當前數(shù)組的長度
// array:就是存儲數(shù)據(jù)的數(shù)組
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
// 如果元素個數(shù)大于等于數(shù)組的長度,需要嘗試擴容。
tryGrow(array, cap);
try {
// 拿到了比較器
Comparator<? super E> cmp = comparator;
// 比較數(shù)據(jù)大小,存儲數(shù)據(jù),是否需要做上移操作,保證平衡的
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
// 元素個數(shù) + 1
size = n + 1;
// 如果有掛起的線程,需要去喚醒掛起的消費者。
notEmpty.signal();
} finally {
// 釋放鎖
lock.unlock();
}
// 返回true
return true;
}
4.4.2 offer擴容操作
在添加數(shù)據(jù)之前,會采用while循環(huán)的方式,來判斷當前元素個數(shù)是否大于等于數(shù)組長度。如果滿足,需要執(zhí)行tryGrow方法,對數(shù)組進行擴容
如果兩個線程同時執(zhí)行tryGrow,只會有一個線程在擴容,另一個線程可能多次走while循環(huán),多次走tryGrow方法,但是依然需要等待前面的線程擴容完畢。
private void tryGrow(Object[] array, int oldCap) {
// 釋放鎖資源。
lock.unlock();
// 聲明新數(shù)組。
Object[] newArray = null;
// 如果allocationSpinLock屬性值為0,說明當前沒有線程正在擴容的。
if (allocationSpinLock == 0 &&
// 基于CAS的方式,將allocationSpinLock從0修改為1,代表當前線程可以開始擴容
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {
try {
// 計算新數(shù)組長度
int newCap = oldCap + ((oldCap < 64) ?
// 如果數(shù)組長度比較小,這里加快擴容長度速度。
(oldCap + 2) :
// 如果長度大于等于64了,每次擴容到1.5倍即可。
(oldCap >> 1));
// 如果新數(shù)組長度大于MAX_ARRAY_SIZE,需要做點事了。
if (newCap - MAX_ARRAY_SIZE > 0) {
// 聲明minCap,長度為老數(shù)組 + 1
int minCap = oldCap + 1;
// 老數(shù)組+1變?yōu)樨摂?shù),或者老數(shù)組長度已經(jīng)大于MAX_ARRAY_SIZE了,無法擴容了。
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
// 告辭,涼涼~~~~
throw new OutOfMemoryError();
// 如果沒有超過限制,直接設(shè)置為最大長度即可
newCap = MAX_ARRAY_SIZE;
}
// 新數(shù)組長度,得大于老數(shù)組長度,
// 第二個判斷確保沒有并發(fā)擴容的出現(xiàn)。
if (newCap > oldCap && queue == array)
// 構(gòu)建出新數(shù)組
newArray = new Object[newCap];
} finally {
// 新數(shù)組有了,標記位歸0~~
allocationSpinLock = 0;
}
}
// 如果到了這,newArray依然為null,說明這個線程沒有進到if方法中,去構(gòu)建新數(shù)組
if (newArray == null)
// 稍微等一手。
Thread.yield();
// 拿鎖資源,
lock.lock();
// 拿到鎖資源后,確認是構(gòu)建了新數(shù)組的線程,這里就需要將新數(shù)組復制給queue,并且導入數(shù)據(jù)
if (newArray != null && queue == array) {
// 將新數(shù)組賦值給queue
queue = newArray;
// 將老數(shù)組的數(shù)據(jù)全部導入到新數(shù)組中。
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
4.4.3 offer添加數(shù)據(jù)-上移平衡
這里是數(shù)據(jù)如何放到數(shù)組上,并且如何保證的二叉堆結(jié)構(gòu)
// k:當前元素的個數(shù)(其實就是要放的索引位置)
// x:需要添加的數(shù)據(jù)
// array:數(shù)組。。
private static <T> void siftUpComparable(int k, T x, Object[] array) {
// 將插入的元素直接強轉(zhuǎn)為Comparable(com.mashibing.User cannot be cast to java.lang.Comparable)
// 這行強轉(zhuǎn),會導致添加沒有實現(xiàn)Comparable的元素,直接報錯。
Comparable<? super T> key = (Comparable<? super T>) x;
// k大于0,走while邏輯。(原來有數(shù)據(jù))
while (k > 0) {
// 獲取父節(jié)點的索引位置。
int parent = (k - 1) >>> 1;
// 拿到父節(jié)點的元素。
Object e = array[parent];
// 用子節(jié)點compareTo父節(jié)點,如果 >= 0,說明當前son節(jié)點比parent要大。
if (key.compareTo((T) e) >= 0)
// 直接break,完事,
break;
// 將son節(jié)點的位置設(shè)置上之前的parent節(jié)點
array[k] = e;
// 重新設(shè)置x節(jié)點需要放置的位置。
k = parent;
}
// k == 0,當前元素是第一個元素,直接插入進去。
array[k] = key;
}
4.5 PriorityBlockingQueue的讀取操作
讀取操作是存儲現(xiàn)在掛起的情況的,因為如果數(shù)組中元素個數(shù)為0,當前線程如果執(zhí)行了take方法,必然需要掛起。
其次獲取數(shù)據(jù),因為是優(yōu)先級隊列,所以需要從二叉堆棧頂拿數(shù)據(jù),直接拿索引為0的數(shù)據(jù)即可,但是拿完之后,需要保持二叉堆結(jié)構(gòu),所以會有下移操作。
4.5.1 查看獲取方法流程
poll:
public E poll() {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lock();
try {
// 拿到返回數(shù)據(jù),沒拿到,返回null
return dequeue();
} finally {
lock.unlock();
}
}
poll(time,unit):
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 將掛起的時間轉(zhuǎn)換為納秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 允許線程中斷拋異常的加鎖
lock.lockInterruptibly();
// 聲明結(jié)果
E result;
try {
// dequeue是去拿數(shù)據(jù)的,可能會出現(xiàn)拿到的數(shù)據(jù)為null,如果為null,同時掛起時間還有剩余,這邊就直接通過notEmpty掛起線程
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
// 有數(shù)據(jù)正常返回,沒數(shù)據(jù),告辭~
return result;
}
take:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
// 無線等,要么有數(shù)據(jù),要么中斷線程
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
4.5.2 查看dequeue獲取數(shù)據(jù)
獲取數(shù)據(jù)主要就是從數(shù)組中拿到0索引位置數(shù)據(jù),然后保持二叉堆結(jié)構(gòu)
private E dequeue() {
// 將元素個數(shù)-1,拿到了索引位置。
int n = size - 1;
// 判斷是不是木有數(shù)據(jù)了,沒數(shù)據(jù)直接返回null即可
if (n < 0)
return null;
// 說明有數(shù)據(jù)
else {
// 拿到數(shù)組,array
Object[] array = queue;
// 拿到0索引位置的數(shù)據(jù)
E result = (E) array[0];
// 拿到最后一個數(shù)據(jù)
E x = (E) array[n];
// 將最后一個位置置位null
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
// 元素個數(shù)-1,賦值size
size = n;
// 返回result
return result;
}
}
4.6.3 下移做平衡操作
一定要以局部的方式去查看樹結(jié)構(gòu)的變化,他是從跟節(jié)點往下找較小的一個子節(jié)點,將較小的子節(jié)點挪動到父節(jié)點位置,再將循環(huán)往下走,如果一來,整個二叉堆的結(jié)構(gòu)就可以保證了。
// k:默認進來是0
// x:代表二叉堆的最后一個數(shù)據(jù)
// array:數(shù)組
// n:最后一個索引
private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {
// 健壯性校驗,取完第一個數(shù)據(jù),已經(jīng)沒數(shù)據(jù)了,那就不需要做平衡操作
if (n > 0) {
// 拿到最后一個數(shù)據(jù)的比較器
Comparable<? super T> key = (Comparable<? super T>)x;
// 因為二叉堆是一個二叉滿樹,所以在保證二叉堆結(jié)構(gòu)時,只需要做一半就可以
int half = n >>> 1;
// 做了超過一半,就不需要再往下找了。
while (k < half) {
// 找左子節(jié)點索引,一個公式,可以找到當前節(jié)點的左子節(jié)點
int child = (k << 1) + 1;
// 拿到左子節(jié)點的數(shù)據(jù)
Object c = array[child];
// 拿到右子節(jié)點索引
int right = child + 1;
// 確認有右子節(jié)點
// 判斷左節(jié)點是否大于右節(jié)點
if (right < n && c.compareTo(array[right]) > 0)
// 如果左大于右,那么c就執(zhí)行右
c = array[child = right];
// 比較最后一個節(jié)點是否小于當前的較小的子節(jié)點
if (key.compareTo((T) c) <= 0)
break;
// 將左右子節(jié)點較小的放到之前的父節(jié)點位置
array[k] = c;
// k重置到之前的子節(jié)點位置
k = child;
}
// 上面while循環(huán)搞定后,可以確認整個二叉堆中,數(shù)據(jù)已經(jīng)移動ok了,只差當前k的位置數(shù)據(jù)是null
// 將最后一個索引的數(shù)據(jù)放到k的位置
array[k] = key;
}
}
五、DelayQueue
5.1 DelayQueue介紹&應(yīng)用
DelayQueue就是一個延遲隊列,生產(chǎn)者寫入一個消息,這個消息還有直接被消費的延遲時間。
需要讓消息具有延遲的特性。
DelayQueue也是基于二叉堆結(jié)構(gòu)實現(xiàn)的,甚至本事就是基于PriorityQueue實現(xiàn)的功能。二叉堆結(jié)構(gòu)每次獲取的是棧頂?shù)臄?shù)據(jù),需要讓DelayQueue中的數(shù)據(jù),在比較時,跟根據(jù)延遲時間做比較,剩余時間最短的要放在棧頂。
查看DelayQueue類信息:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
// 發(fā)現(xiàn)DelayQueue中的元素,需要繼承Delayed接口。
}
// ==========================================
// 接口繼承了Comparable,這樣就具備了比較的能力。
public interface Delayed extends Comparable<Delayed> {
// 抽象方法,就是咱們需要設(shè)置的延遲時間
long getDelay(TimeUnit unit);
// Comparable接口提供的:public int compareTo(T o);
}
基于上述特點,聲明一個可以寫入DelayQueue的元素類
public class Task implements Delayed {
/** 任務(wù)的名稱 */
private String name;
/** 什么時間點執(zhí)行 */
private Long time;
/**
*
* @param name
* @param delay 單位毫秒。
*/
public Task(String name, Long delay) {
// 任務(wù)名稱
this.name = name;
this.time = System.currentTimeMillis() + delay;
}
/**
* 設(shè)置任務(wù)什么時候可以出延遲隊列
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
// 單位是毫秒,視頻里寫錯了,寫成了納秒,
return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
/**
* 兩個任務(wù)在插入到延遲隊列時的比較方式
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.time - ((Task)o).getTime());
}
}
在使用時,查看到DelayQueue底層用了PriorityQueue,在一定程度上,DelayQueue也是無界隊列。
測試效果
public static void main(String[] args) throws InterruptedException {
// 聲明元素
Task task1 = new Task("A",1000L);
Task task2 = new Task("B",5000L);
Task task3 = new Task("C",3000L);
Task task4 = new Task("D",2000L);
// 聲明阻塞隊列
DelayQueue<Task> queue = new DelayQueue<>();
// 將元素添加到延遲隊列中
queue.put(task1);
queue.put(task2);
queue.put(task3);
queue.put(task4);
// 獲取元素
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
// A,D,C,B
}
在應(yīng)用時,外賣,15分鐘商家需要節(jié)點,如果不節(jié)點,這個訂單自動取消。
可以每下一個訂單,就放到延遲隊列中,如果規(guī)定時間內(nèi),商家沒有節(jié)點,直接通過消費者獲取元素,然后取消訂單。
只要是有需要延遲一定時間后,再執(zhí)行的任務(wù),就可以通過延遲隊列去實現(xiàn)。
5.2、DelayQueue核心屬性
可以查看到DelayQueue就四個核心屬性
// 因為DelayQueue依然屬于阻塞隊列,需要保證線程安全。看到只有一把鎖,生產(chǎn)者和消費者使用的是一個lock
private final transient ReentrantLock lock = new ReentrantLock();
// 因為DelayQueue還是基于二叉堆結(jié)構(gòu)實現(xiàn)的,沒有必要重新搞一個二叉堆,直接使用的PriorityQueue
private final PriorityQueue<E> q = new PriorityQueue<E>();
// leader一般會存儲等待棧頂數(shù)據(jù)的消費者,在整體寫入和消費的過程中,會設(shè)置的leader的一些判斷。
private Thread leader = null;
// 生產(chǎn)者在插入數(shù)據(jù)時,不會阻塞的。當前的Condition就是給消費者用的
// 比如消費者在獲取數(shù)據(jù)時,發(fā)現(xiàn)棧頂?shù)臄?shù)據(jù)還又沒到延遲時間。
// 這個時候,咱們就需要將消費者線程掛起,阻塞一會,阻塞到元素到了延遲時間,或者是,生產(chǎn)者插入的元素到了棧頂,此時生產(chǎn)者會喚醒消費者。
private final Condition available = lock.newCondition();
5.3、DelayQueue寫入流程分析
Delay是無界的,數(shù)組可以動態(tài)的擴容,不需要關(guān)注生產(chǎn)者的阻塞問題,他就沒有阻塞問題。
這里只需要查看offer方法即可。
public boolean offer(E e) {
// 直接獲取lock,加鎖。
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 直接調(diào)用PriorityQueue的插入方法,這里會根據(jù)之前重寫Delayed接口中的compareTo方法做排序,然后調(diào)整上移和下移操作。
q.offer(e);
// 調(diào)用優(yōu)先級隊列的peek方法,拿到堆頂?shù)臄?shù)據(jù)
// 拿到堆頂數(shù)據(jù)后,判斷是否是剛剛插入的元素
if (q.peek() == e) {
// leader賦值為null。在消費者的位置再提一嘴
leader = null;
// 喚醒消費者,避免剛剛插入的數(shù)據(jù)的延遲時間出現(xiàn)問題。
available.signal();
}
// 插入成功,
return true;
} finally {
// 釋放鎖
lock.unlock();
}
}
5.4、DelayQueue讀取流程分析
消費者依然還是存在阻塞的情況,因為有兩個情況
- 消費者要拿到棧頂數(shù)據(jù),但是延遲時間還沒到,此時消費者需要等待一會。
- 消費者要來拿數(shù)據(jù),但是發(fā)現(xiàn)已經(jīng)有消費者在等待棧頂數(shù)據(jù)了,這個后來的消費者也需要等待一會。
依然需要查看四個方法的實現(xiàn)
5.4.1 remove方法
// 依然是AbstractQueue提供的方法,有結(jié)果就返回,沒結(jié)果扔異常
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
5.4.2 poll方法
// poll是淺嘗一下,不會阻塞消費者,能拿就拿,拿不到就拉倒
public E poll() {
// 消費者和生產(chǎn)者是一把鎖,先拿鎖,加鎖。
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 拿到棧頂數(shù)據(jù)。
E first = q.peek();
// 如果元素為null,直接返回null
// 如果getDelay方法返回的結(jié)果是大于0的,那說明當前元素還每到延遲時間,元素無法返回,返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// 到這說明元素不為null,并且已經(jīng)達到了延遲時間,直接調(diào)用優(yōu)先級隊列的poll方法
return q.poll();
} finally {
// 釋放鎖。
lock.unlock();
}
}
5.4.3 poll(time,unit)方法
這個是允許阻塞的,并且指定一定的時間
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 先將時間轉(zhuǎn)為納秒
long nanos = unit.toNanos(timeout);
// 拿鎖,加鎖。
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 死循環(huán)。
for (;;) {
// 拿到堆頂數(shù)據(jù)
E first = q.peek();
// 如果元素為null
if (first == null) {
// 并且等待的時間小于等于0。不能等了,直接返回null
if (nanos <= 0)
return null;
// 說明當前線程還有可以阻塞的時間,阻塞指定時間即可。
else
// 這里掛起線程后,說明隊列沒有元素,在生產(chǎn)者添加數(shù)據(jù)之后,會喚醒
nanos = available.awaitNanos(nanos);
// 到這說明,有數(shù)據(jù)
} else {
// 有數(shù)據(jù)的話,先獲取數(shù)據(jù)現(xiàn)在是否可以執(zhí)行,延遲時間是否已經(jīng)到了指定時間
long delay = first.getDelay(NANOSECONDS);
// 延遲時間是否已經(jīng)到了,
if (delay <= 0)
// 時間到了,直接執(zhí)行優(yōu)先級隊列的poll方法,返回元素
return q.poll();
// ==================延遲時間沒到,消費者需要等一會===================
// 這個是查看消費者可以等待的時間,
if (nanos <= 0)
// 直接返回nulll
return null;
// ==================延遲時間沒到,消費者可以等一會===================
// 把first賦值為null
first = null;
// 如果等待的時間,小于元素剩余的延遲時間,消費者直接掛起。反正暫時拿不到,但是不能保證后續(xù)是否有生產(chǎn)者添加一個新的數(shù)據(jù),我是可以拿到的。
// 如果已經(jīng)有一個消費者在等待堆頂數(shù)據(jù)了,我這邊不做額外操作,直接掛起即可。
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
// 當前消費者的阻塞時間可以拿到數(shù)據(jù),并且沒有其他消費者在等待堆頂數(shù)據(jù)
else {
// 拿到當前消費者的線程對象
Thread thisThread = Thread.currentThread();
// 將leader設(shè)置為當前線程
leader = thisThread;
try {
// 會讓當前消費者,阻塞這個元素的延遲時間
long timeLeft = available.awaitNanos(delay);
// 重新計算當前消費者剩余的可阻塞時間,。
nanos -= delay - timeLeft;
} finally {
// 到了時間,將leader設(shè)置為null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 沒有消費者在等待元素,隊列中的元素不為null
if (leader == null && q.peek() != null)
// 只要當前沒有l(wèi)eader在等,并且隊列有元素,就需要再次喚醒消費者。、
// 避免隊列有元素,但是沒有消費者處理的問題
available.signal();
// 釋放鎖
lock.unlock();
}
}
5.4.4 take方法
這個是允許阻塞的,但是可以一直等,要么等到元素,要么等到被中斷。
public E take() throws InterruptedException {
// 正常加鎖,并且允許中斷
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 拿到元素
E first = q.peek();
if (first == null)
// 沒有元素掛起。
available.await();
else {
// 有元素,獲取延遲時間。
long delay = first.getDelay(NANOSECONDS);
// 判斷延遲時間是不是已經(jīng)到了
if (delay <= 0)
// 基于優(yōu)先級隊列的poll方法返回
return q.poll();
first = null;
// 如果有消費者在等,就正常await掛起
if (leader != null)
available.await();
// 如果沒有消費者在等的堆頂數(shù)據(jù),我來等
else {
// 獲取當前線程
Thread thisThread = Thread.currentThread();
// 設(shè)置為leader,代表等待堆頂?shù)臄?shù)據(jù)
leader = thisThread;
try {
// 等待指定(堆頂元素的延遲時間)時長,
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
// leader賦值null
leader = null;
}
}
}
}
} finally {
// 避免消費者無限等,來一個喚醒消費者的方法,一般是其他消費者拿到元素走了之后,并且延遲隊列還有元素,就執(zhí)行if內(nèi)部喚醒方法
if (leader == null && q.peek() != null)
available.signal();
// 釋放鎖
lock.unlock();
}
}
六、SynchronousQueue
6.1 SynchronousQueue介紹
SynchronousQueue這個阻塞隊列和其他的阻塞隊列有很大的區(qū)別
在咱們的概念中,隊列肯定是要存儲數(shù)據(jù)的,但是SynchronousQueue不會存儲數(shù)據(jù)的
SynchronousQueue隊列中,他不存儲數(shù)據(jù),存儲生產(chǎn)者或者是消費者
當存儲一個生產(chǎn)者到SynchronousQueue隊列中之后,生產(chǎn)者會阻塞(看你調(diào)用的方法)
生產(chǎn)者最終會有幾種結(jié)果:
- 如果在阻塞期間有消費者來匹配,生產(chǎn)者就會將綁定的消息交給消費者
- 生產(chǎn)者得等阻塞結(jié)果,或者不允許阻塞,那么就直接失敗
- 生產(chǎn)者在阻塞期間,如果線程中斷,直接告辭。
同理,消費者和生產(chǎn)者的效果是一樣。
生產(chǎn)者和消費者的數(shù)據(jù)是直接傳遞的,不會經(jīng)過SynchronousQueue。
SynchronousQueue是不會存儲數(shù)據(jù)的。
經(jīng)過阻塞隊列的學習:
生產(chǎn)者:
- offer():生產(chǎn)者在放到SynchronousQueue的同時,如果有消費者在等待消息,直接配對。如果沒有消費者在等待消息,這里直接返回,告辭。
- offer(time,unit):生產(chǎn)者在放到SynchronousQueue的同時,如果有消費者在等待消息,直接配對。如果沒有消費者在等待消息,阻塞time時間,如果還沒有,告辭。
- put():生產(chǎn)者在放到SynchronousQueue的同時,如果有消費者在等待消息,直接配對。如果沒有,死等。
消費者:poll(),poll(time,unit),take()。道理和上面的生產(chǎn)者一致。
測試效果:
public static void main(String[] args) throws InterruptedException {
// 因為當前隊列不存在數(shù)據(jù),沒有長度的概念。
SynchronousQueue queue = new SynchronousQueue();
String msg = "消息!";
/*new Thread(() -> {
// b = false:代表沒有消費者來拿
boolean b = false;
try {
b = queue.offer(msg,1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(b);
}).start();
Thread.sleep(100);
new Thread(() -> {
System.out.println(queue.poll());
}).start();*/
new Thread(() -> {
try {
System.out.println(queue.poll(1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(100);
new Thread(() -> {
queue.offer(msg);
}).start();
}
6.2 SynchronousQueue核心屬性
進到SynchronousQueue類的內(nèi)部后,發(fā)現(xiàn)了一個內(nèi)部類,Transferer,內(nèi)部提供了一個transfer的方法
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
當前這個類中提供的transfer方法,就是生產(chǎn)者和消費者在調(diào)用讀寫數(shù)據(jù)時要用到的核心方法。
生產(chǎn)者在調(diào)用上述的transfer方法時,第一個參數(shù)e會正常傳遞數(shù)據(jù)
消費者在調(diào)用上述的transfer方法時,第一個參數(shù)e會傳遞null
SynchronousQueue針對抽象類Transferer做了幾種實現(xiàn)。
一共看到了兩種實現(xiàn)方式:
- TransferStack
- TransferQueue
這兩種類繼承了Transferer抽象類,在構(gòu)建SynchronousQueue時,會指定使用哪種子類
// 到底采用哪種實現(xiàn),需要把對應(yīng)的對象存放到這個屬性中
private transient volatile Transferer<E> transferer;
// 采用無參時,會調(diào)用下述方法,再次調(diào)用有參構(gòu)造傳入false
public SynchronousQueue() {
this(false);
}
// 調(diào)用的是當前的有參構(gòu)造,fair代表公平還是不公平
public SynchronousQueue(boolean fair) {
// 如果是公平,采用Queue,如果是不公平,采用Stack
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
TransferQueue的特點
代碼查看效果
public static void main(String[] args) throws InterruptedException {
// 因為當前隊列不存在數(shù)據(jù),沒有長度的概念。
SynchronousQueue queue = new SynchronousQueue(true);
SynchronousQueue queue = new SynchronousQueue(false);
new Thread(() -> {
try {
queue.put("生1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
queue.put("生2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
queue.put("生3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(100);
new Thread(() -> {
System.out.println("消1:" + queue.poll());
}).start();
Thread.sleep(100);
new Thread(() -> {
System.out.println("消2:" + queue.poll());
}).start();
Thread.sleep(100);
new Thread(() -> {
System.out.println("消3:" + queue.poll());
}).start();
}
6.3 SynchronousQueue的TransferQueue源碼
為了查看清除SynchronousQueue的TransferQueue源碼,需要從兩點開始查看源碼信息文章來源:http://www.zghlxwxcb.cn/news/detail-642523.html
6.3.1 QNode源碼信息
static final class QNode {
// 當前節(jié)點可以獲取到next節(jié)點
volatile QNode next;
// item在不同情況下效果不同
// 生產(chǎn)者:有數(shù)據(jù)
// 消費者:為null
volatile Object item;
// 當前線程
volatile Thread waiter;
// 當前屬性是用來區(qū)分消費者和生產(chǎn)者的屬性
final boolean isData;
// 最終生產(chǎn)者需要將item交給消費者
// 最終消費者需要獲取生產(chǎn)者的item
// 省略了大量提供的CAS操作
....
}
6.3.2 transfer方法實現(xiàn)
// 當前方法是TransferQueue的核心內(nèi)容
// e:傳遞的數(shù)據(jù)
// timed:false,代表無限阻塞,true,代表阻塞nacos時間
E transfer(E e, boolean timed, long nanos) {
// 當前QNode是要封裝當前生產(chǎn)者或者消費者的信息
QNode s = null;
// isData == true:代表是生產(chǎn)者
// isData == false:代表是消費者
boolean isData = (e != null);
// 死循環(huán)
for (;;) {
// 獲取尾節(jié)點和頭結(jié)點
QNode t = tail;
QNode h = head;
// 為了避免TransferQueue還沒有初始化,這邊做一個健壯性判斷
if (t == null || h == null)
continue;
// 如果滿足h == t 條件,說明當前隊列沒有生產(chǎn)者或者消費者,為空
// 如果有節(jié)點,同時當前節(jié)點和隊列節(jié)點屬于同一種角色。
// if中的邏輯是進到隊列
if (h == t || t.isData == isData) {
// ===================在判斷并發(fā)問題==========================
// 拿到尾節(jié)點的next
QNode tn = t.next;
// 如果t不為尾節(jié)點,進來說明有其他線程并發(fā)修改了tail
if (t != tail)
// 重新走for循環(huán)
continue;
// tn如果為不null,說明前面有線程并發(fā),添加了一個節(jié)點
if (tn != null) {
// 直接幫助那個并發(fā)線程修改tail的指向
advanceTail(t, tn);
// 重新走for循環(huán)
continue;
}
// 獲取當前線程是否可以阻塞
// 如果timed為true,并且阻塞的時間小于等于0
// 不需要匹配,直接告辭?。?!
if (timed && nanos <= 0)
return null;
// 如果可以阻塞,將當前需要插入到隊列的QNode構(gòu)建出來
if (s == null)
s = new QNode(e, isData);
// 基于CAS操作,將tail節(jié)點的next設(shè)置為當前線程
if (!t.casNext(null, s))
// 如果進到if,說明修改失敗,重新執(zhí)行for循環(huán)修改
continue;
// CAS操作成功,直接替換tail的指向
advanceTail(t, s);
// 如果進到隊列中了,掛起線程,要么等生產(chǎn)者,要么等消費者。
// x是返回替換后的數(shù)據(jù)
Object x = awaitFulfill(s, e, timed, nanos);
// 如果元素和節(jié)點相等,說明節(jié)點取消了
if (x == s) {
// 清空當前節(jié)點,將上一個節(jié)點的next指向當前節(jié)點的next,直接告辭
clean(t, s);
return null;
}
// 判斷當前節(jié)點是否還在隊列中
if (!s.isOffList()) {
// 將當前節(jié)點設(shè)置為head
advanceHead(t, s);
// 如果 x != null, 如果拿到了數(shù)據(jù),說明我是消費者
if (x != null)
// 將當前節(jié)點的item設(shè)置為自己
s.item = s;
// 線程置位null
s.waiter = null;
}
// 返回數(shù)據(jù)
return (x != null) ? (E)x : e;
}
// 匹配隊列中的橘色
else {
// 拿到head的next,作為要匹配的節(jié)點
QNode m = h.next;
// 做并發(fā)判斷,如果頭節(jié)點,尾節(jié)點,或者head.next發(fā)生了變化,這邊要重新走for循環(huán)
if (t != tail || m == null || h != head)
continue;
// 沒并發(fā)問題,可以拿數(shù)據(jù)
// 拿到m節(jié)點的item作為x。
Object x = m.item;
// 如果isData == (x != null)滿足,說明當前出現(xiàn)了并發(fā)問題,避免并發(fā)消費出現(xiàn)坑
if (isData == (x != null) ||
// 如果排隊的節(jié)點取消,就會講當前QNode中的item指向QNode
x == m ||
// 如果前面兩個都沒滿足,可以交換數(shù)據(jù)了。
// 如果交換失敗,說明有并發(fā)問題,
!m.casItem(x, e)) {
// 重新設(shè)置head節(jié)點,并且再走一次循環(huán)
advanceHead(h, m);
continue;
}
// 替換head
advanceHead(h, m);
// 喚醒head.next中的線程
LockSupport.unpark(m.waiter);
// 這邊匹配好了,數(shù)據(jù)也交換了,直接返回
// 如果 x != null,說明隊列中是生產(chǎn)者,當前是消費者,這邊直接返回x具體數(shù)據(jù)
// 反之,隊列中是消費者,當前是生產(chǎn)者,直接返回自己的數(shù)據(jù)
return (x != null) ? (E)x : e;
}
}
}
6.3.3 tansfer方法流程圖
文章來源地址http://www.zghlxwxcb.cn/news/detail-642523.html
到了這里,關(guān)于多線程與高并發(fā)--------阻塞隊列的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!