四、阻塞隊(duì)列
1 基礎(chǔ)概念
1.1 生產(chǎn)者消費(fèi)者概念
生產(chǎn)者-消費(fèi)者是設(shè)計(jì)模式的一種,讓生產(chǎn)者和消費(fèi)者基于一個(gè)容器來(lái)解決強(qiáng)耦合的問(wèn)題。生產(chǎn)者與消費(fèi)者彼此之間不會(huì)直接通訊,而是通過(guò)一個(gè)容器(隊(duì)列)進(jìn)行通訊。
- 生產(chǎn)者生產(chǎn)完數(shù)據(jù)后扔到容器中,不用等消費(fèi)者來(lái)處理;
- 消費(fèi)者也不需要去找生產(chǎn)者要數(shù)據(jù),直接從容器中獲取即可;
- 而這種容器最常用的結(jié)構(gòu)就是隊(duì)列。
1.2 JUC阻塞隊(duì)列的存取方法
常用的存取方法都來(lái)自 JUC 包下的 BlockingQueue
- 生產(chǎn)者存儲(chǔ)方法:
- add(E):添加數(shù)據(jù)到隊(duì)列,若隊(duì)列滿(mǎn)了,拋出異常;
- offer(E):添加數(shù)據(jù)到隊(duì)列,若隊(duì)列滿(mǎn)了,返回 false;
- offer(E,timeout,unit):添加數(shù)據(jù)到隊(duì)列,若隊(duì)列滿(mǎn)了,阻塞 timeout 時(shí)間,超時(shí)后返回 false;
- put(E):添加數(shù)據(jù)到隊(duì)列,若隊(duì)列滿(mǎn)了,掛起線(xiàn)程,等到隊(duì)列中有位置,再扔數(shù)據(jù)進(jìn)去,死等。
- 消費(fèi)者取數(shù)據(jù)方法:
- remove():從隊(duì)列中移除數(shù)據(jù),若隊(duì)列為空,拋出異常;
- poll():從隊(duì)列中移除數(shù)據(jù),若隊(duì)列為空,返回 false;
- poll(timeout,unit):從隊(duì)列中移除數(shù)據(jù),若隊(duì)列為空,阻塞 timeout 時(shí)間,等生產(chǎn)者仍數(shù)據(jù)再獲取數(shù)據(jù),超時(shí)后返回 false;
- take():從隊(duì)列中移除數(shù)據(jù),若隊(duì)列為空,掛起線(xiàn)程,一直等生產(chǎn)者仍數(shù)據(jù)再獲取。
2 ArrayBlockingQueue
2.1 ArrayBlockingQueue的基本使用
- ArrayBlockingQueue 在初始化時(shí),必須指定當(dāng)前隊(duì)列的長(zhǎng)度,因?yàn)?ArrayBlockingQueue 是基于數(shù)組實(shí)現(xiàn)的隊(duì)列結(jié)構(gòu),數(shù)組長(zhǎng)度不可變,必須提前設(shè)置數(shù)據(jù)長(zhǎng)度信息。
public static void main(String[] args) throws InterruptedException {
// 必須設(shè)置隊(duì)列長(zhǎng)度
ArrayBlockingQueue queue = new ArrayBlockingQueue(4);
// 生產(chǎn)者生產(chǎn)數(shù)據(jù)
queue.add("1");
queue.offer("2");
queue.offer("3", 2, TimeUnit.SECONDS);
queue.put("4");
// 消費(fèi)者消費(fèi)數(shù)據(jù)
System.out.println(queue.remove());
System.out.println(queue.poll());
System.out.println(queue.poll(2, TimeUnit.SECONDS));
System.out.println(queue.take());
}
2.2 生產(chǎn)者方法實(shí)現(xiàn)原理
- 生產(chǎn)者添加數(shù)據(jù)到隊(duì)列的方法比較多,需要一個(gè)一個(gè)看
2.2.1 ArrayBlockingQueue的常見(jiàn)屬性
ArrayBlockingQueue中的成員變量
final Object[] items; // 就是數(shù)組本身
int takeIndex; // 取數(shù)據(jù)的下標(biāo)
int putIndex; // 存數(shù)據(jù)的下標(biāo)
int count; // 當(dāng)前數(shù)組中元素的個(gè)數(shù)
final ReentrantLock lock; // 就是一個(gè) ReentrantLock 鎖
private final Condition notEmpty; // 消費(fèi)者掛起線(xiàn)程和喚醒線(xiàn)程用到的Condition(可看作是synchronized的wait和notify)
private final Condition notFull; // 生產(chǎn)者掛起線(xiàn)程和喚醒線(xiàn)程用到的Condition(可看作是synchronized的wait和notify)
2.2.2 add方法
- add方法本身就是調(diào)用了offer方法,如果offer方法返回false,直接拋出異常
public boolean add(E e) {
if (offer(e))
return true;
else // 拋出的異常
throw new IllegalStateException("Queue full");
}
2.2.3 offer方法
public boolean offer(E e) {
checkNotNull(e); // 要求存儲(chǔ)的數(shù)據(jù)不允許為null,否則拋出空指針異常
// 拿到當(dāng)前阻塞隊(duì)列的lock鎖
final ReentrantLock lock = this.lock;
lock.lock(); // 為保證線(xiàn)程安全,加鎖
try {
// 判斷隊(duì)列中元素是否滿(mǎn)了,若滿(mǎn)了,則返回false
if (count == items.length)
return false;
else {
// 隊(duì)列沒(méi)滿(mǎn),執(zhí)行 enqueue 將元素添加到隊(duì)列中,并返回true
enqueue(e);
return true;
}
} finally {
lock.unlock(); // 操作完釋放鎖
}
}
// ================
private void enqueue(E x) {
// 拿到數(shù)組的引用,將元素放到指定的位置
final Object[] items = this.items;
items[putIndex] = x;
// 對(duì)putIndex進(jìn)行++操作,并判斷是否等于數(shù)組長(zhǎng)度,需要?dú)w為
if (++putIndex == items.length)
putIndex = 0; // 歸位:將索引值設(shè)置為0
count++; // 添加成功,數(shù)據(jù)++
notEmpty.signal(); // 將一個(gè)Condition中阻塞的線(xiàn)程喚醒
}
2.2.4 offer(time,unit)方法
生產(chǎn)者在添加數(shù)據(jù)時(shí),如果隊(duì)列已經(jīng)滿(mǎn),阻塞一會(huì):文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-694621.html
- 阻塞到消費(fèi)者消費(fèi)了消息,然后喚醒當(dāng)前阻塞線(xiàn)程;
- 阻塞到了 timeout 時(shí)間,再次判斷是否可以添加,若不能直接告辭。
// 線(xiàn)程在掛起時(shí),如果對(duì)當(dāng)前阻塞線(xiàn)程的終端標(biāo)記位進(jìn)行設(shè)置,會(huì)拋出異常直接結(jié)束
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 非空校驗(yàn)
checkNotNull(e);
long nanos = unit.toNanos(timeout); // 將時(shí)間單位轉(zhuǎn)為納秒
final ReentrantLock lock = this.lock; // 加鎖
lock.lockInterruptibly(); // 允許線(xiàn)程中斷排除異常的加鎖方法
try {
// 為什么是while(虛假喚醒)
while (count == items.length) {
// 如果元素個(gè)數(shù)和數(shù)組長(zhǎng)度一致,說(shuō)明隊(duì)列滿(mǎn)了
if (nanos <= 0) // 判斷等待時(shí)間是否充裕
return false; // 不充裕,直接添加失敗,返回false
// 掛起等待,會(huì)同時(shí)釋放鎖資源(對(duì)標(biāo) synchronized 的wait方法)
// awaitNanos會(huì)掛起線(xiàn)程,并且返回剩余的阻塞時(shí)間,恢復(fù)執(zhí)行時(shí),需要重新獲取鎖資源
nanos = notFull.awaitNanos(nanos);
}
enqueue(e); // 這里鎖門(mén)隊(duì)列有空間了,enqueue將數(shù)據(jù)添加到阻塞隊(duì)列中,并返回true
return true;
} finally {
lock.unlock(); // 是否鎖資源
}
}
2.2.5 put方法
- 如果隊(duì)列是滿(mǎn)的,就一直掛起,直到被喚醒,或者被中斷
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
// await方法會(huì)一直阻塞,直到被喚醒或者被中斷
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
2.3 消費(fèi)者方法實(shí)現(xiàn)原理
2.3.1 remove方法
- remove方法本身就是調(diào)用了poll方法,如果poll方法返回null,直接拋出異常
public E remove() {
E x = poll();
if (x != null)
return x;
else // 沒(méi)數(shù)據(jù)拋出異常
throw new NoSuchElementException();
}
2.3.2 poll方法
// 拉取數(shù)據(jù)
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock(); // 加鎖
try {
// 若沒(méi)有數(shù)據(jù),直接返回null;否則執(zhí)行dequeue,取出數(shù)據(jù)并返回
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
// 取出數(shù)據(jù)
private E dequeue() {
// 將成員變量引用到局部變量
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; // 直接獲取指定索引位置的數(shù)據(jù)
items[takeIndex] = null; // 取出數(shù)據(jù)后,清空該索引位置
if (++takeIndex == items.length) // 設(shè)置下次取數(shù)據(jù)的索引位置
takeIndex = 0;
count--; // 數(shù)組中元素個(gè)數(shù)減一
if (itrs != null) // 迭代器內(nèi)容先跳過(guò)
itrs.elementDequeued();
// signal方法,會(huì)喚醒當(dāng)前Condition中排隊(duì)的一個(gè)Node
// signalAll方法,會(huì)將Condition中所有的Node,全都喚醒
notFull.signal();
return x; // 返回?cái)?shù)據(jù)
}
2.3.3 poll(timeout,unit)方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout); // 轉(zhuǎn)換時(shí)間單位
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加鎖,可中斷喚醒
try {
while (count == 0) {
// 如果沒(méi)數(shù)據(jù)
if (nanos <= 0) // 也沒(méi)時(shí)間了,就不阻塞,返回null
return null;
// 有時(shí)間,就掛起消費(fèi)者線(xiàn)程一段時(shí)間
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue(); // 取數(shù)據(jù)
} finally {
lock.unlock();
}
}
2.3.4 take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) // 使用while,防止虛假喚醒
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
2.3.5 虛假喚醒
阻塞隊(duì)列中,如果需要線(xiàn)程掛起操作,判斷有無(wú)數(shù)據(jù)的位置采用的是while循環(huán),為什么不使用if?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-694621.html
- 首先肯定不能換成 if 邏輯判斷,比如:有線(xiàn)程 A、B、E、C,其中 ABE 是生產(chǎn)者,C是消費(fèi)者。假如線(xiàn)程的隊(duì)列是滿(mǎn)的,AB掛起
// E,拿到鎖資源,還沒(méi)有走while判斷
while (count == items.length)
// A醒了
// B掛起
notFull.await();
enqueue(e);
- C 此時(shí)消費(fèi)一條數(shù)據(jù),執(zhí)行 notFull.signal() 喚醒一個(gè)線(xiàn)程,A線(xiàn)程被喚醒;E走判斷發(fā)現(xiàn)有空余位置,可以添加數(shù)據(jù)到隊(duì)列,則E添加數(shù)據(jù),走enqueue。
- 如果判斷是 if,A 在E釋放鎖資源后,拿到鎖資源,直接走 enqueue 方法,此時(shí) A線(xiàn)程就是在 putIndex 的位置,覆蓋掉之前的數(shù)據(jù),會(huì)造成數(shù)據(jù)安全問(wèn)題。
3 LinkedBlockingQueue
3.1 LinkedBlockingQueue的底層實(shí)現(xiàn)
- 查看 LinkedBlockingQueue 是如何存儲(chǔ)數(shù)據(jù),以及如何實(shí)現(xiàn)鏈表結(jié)構(gòu)的。
// Node對(duì)象就是存儲(chǔ)數(shù)據(jù)的單位
static class Node<E> {
// 存儲(chǔ)的數(shù)據(jù)
E item;
// 指向下一個(gè)數(shù)據(jù)的指針
Node<E> next;
// 有參構(gòu)造
Node(E x) {
item = x; }
}
- 查看LinkedBlockingQueue的有參構(gòu)造
// 可以手動(dòng)指定LinkedBlockingQueue的長(zhǎng)度,如果沒(méi)有指定,默認(rèn)為Integer.MAX_VALUE
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 在初始化時(shí),構(gòu)建一個(gè)item為null的節(jié)點(diǎn),作為head和last,這種node可以成為哨兵Node,
// 如果沒(méi)有哨兵節(jié)點(diǎn),那么在獲取數(shù)據(jù)時(shí),需要判斷head是否為null,才能找next
// 如果沒(méi)有哨兵節(jié)點(diǎn),那么在添加數(shù)據(jù)時(shí),需要判斷l(xiāng)ast是否為null,才能找next
last = head = new Node<E>(null);
}
- 查看LinkedBlockingQueue的其他屬性
// 因?yàn)槭擎湵?,沒(méi)有想數(shù)組的length屬性,基于AtomicInteger來(lái)記錄長(zhǎng)度
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head; // 鏈表的頭
到了這里,關(guān)于多線(xiàn)程與高并發(fā)——并發(fā)編程(4)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!