阻塞隊(duì)列BlockingQueue
隊(duì)列
先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu),允許出隊(duì)的一端稱為隊(duì)頭,允許入隊(duì)的一端稱為隊(duì)尾。
在java中為隊(duì)列定義了一個(gè)接口規(guī)范Queue
接口
public interface Queue<E> extends Collection<E> {
//添加一個(gè)元素,添加成功返回true, 如果隊(duì)列滿了,就會(huì)拋出異常
boolean add(E e);
//添加一個(gè)元素,添加成功返回true, 如果隊(duì)列滿了,返回false
boolean offer(E e);
//返回并刪除隊(duì)首元素,隊(duì)列為空則拋出異常
E remove();
//返回并刪除隊(duì)首元素,隊(duì)列為空則返回null
E poll();
//返回隊(duì)首元素,但不移除,隊(duì)列為空則拋出異常
E element();
//獲取隊(duì)首元素,但不移除,隊(duì)列為空則返回null
E peek();
}
阻塞隊(duì)列
阻塞隊(duì)列提供了線程安全的隊(duì)列訪問方式。多線程入隊(duì)互斥,多線程出隊(duì)互斥,隊(duì)列滿了阻塞生產(chǎn)者線程,隊(duì)列空了阻塞消費(fèi)者線程
阻塞隊(duì)列的接口BlockingQueue
它繼承了 Queue
接口。并在原有Queue
接口定義的方法規(guī)范下,再新增了兩個(gè)關(guān)于阻塞的方法規(guī)范。
public interface BlockingQueue<E> extends Queue<E> {
// 阻塞方式 入隊(duì)
void put(E e) throws InterruptedException;
// 阻塞方式 出隊(duì)
E take() throws InterruptedException;
......
}
方法 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞特定時(shí)間 |
---|---|---|---|---|
入隊(duì) | add(e) | offer(e) | put(e) | offer(e, time, unit) |
出隊(duì) | remove() | poll() | take() | poll(time, unit) |
獲取隊(duì)首元素 | element() | peek() | 不支持 | 不支持 |
阻塞隊(duì)列的應(yīng)用場(chǎng)景
- 線程池
- 生產(chǎn)者-消費(fèi)者模型
- 消息隊(duì)列
- 緩存系統(tǒng)
- 并發(fā)任務(wù)處理
BlockingQueue 接口的實(shí)現(xiàn)類都被放在了 juc 包中,它們的區(qū)別主要體現(xiàn)在存儲(chǔ)結(jié)構(gòu)上或?qū)υ夭僮魃系牟煌?,但是?duì)于take與put操作的原理卻是類似的。
隊(duì)列 | 描述 |
---|---|
ArrayBlockingQueue | 基于數(shù)組結(jié)構(gòu)實(shí)現(xiàn)的一個(gè)有界阻塞隊(duì)列 |
LinkedBlockingQueue | 基于鏈表結(jié)構(gòu)實(shí)現(xiàn)的一個(gè)無界阻塞隊(duì)列,指定容量為有界阻塞隊(duì)列 |
PriorityBlockingQueue | 支持按優(yōu)先級(jí)排序的無界阻塞隊(duì)列 |
DelayQueue | 基于優(yōu)先級(jí)隊(duì)列(PriorityBlockingQueue)實(shí)現(xiàn)的無界阻塞隊(duì)列 |
SynchronousQueue | 不存儲(chǔ)元素的阻塞隊(duì)列 |
LinkedTransferQueue | 基于鏈表結(jié)構(gòu)實(shí)現(xiàn)的一個(gè)無界阻塞隊(duì)列 |
LinkedBlockingDeque | 基于鏈表結(jié)構(gòu)實(shí)現(xiàn)的一個(gè)雙端阻塞隊(duì)列 |
詳情
ArrayBlockingQueue
簡(jiǎn)介
ArrayBlockingQueue它隊(duì)列使用的數(shù)據(jù)結(jié)構(gòu)是雙指針的環(huán)形數(shù)組,入隊(duì)出隊(duì)是基于生產(chǎn)者消費(fèi)者模型實(shí)現(xiàn)的,它入隊(duì)和出隊(duì)使用的是同一把鎖。
基本使用文章來源:http://www.zghlxwxcb.cn/news/detail-406720.html
BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1"); //向隊(duì)列中添加元素
Object object = queue.take(); //從隊(duì)列中取出元素
其實(shí)ava.util.concurrent.locks.Condition
接口的注釋中 給出的案例就和ArrayBlockingQueue的實(shí)現(xiàn)很相似,如下所示使用一個(gè)環(huán)形數(shù)組實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式:文章來源地址http://www.zghlxwxcb.cn/news/detail-406720.html
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
// 生產(chǎn)者,把方法的入?yún)⑼?duì)列中存,隊(duì)列是一個(gè)環(huán)形數(shù)組
public void put(Object x) throws InterruptedException {
lock.lock();
try {
// 隊(duì)列放滿了生產(chǎn)者就阻塞
while (count == items.length)
notFull.await();
// 往隊(duì)列存數(shù)據(jù)
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
// 隊(duì)列中新存值了,要通知消費(fèi)者開始消費(fèi)了
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
// 消費(fèi)者,從隊(duì)列中取
public Object take() throws InterruptedException {
lock.lock();
try {
// 隊(duì)列空了,消費(fèi)者就阻塞
while (count == 0)
notEmpty.await();
// 從隊(duì)列中取值
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
// 已經(jīng)消費(fèi)了,隊(duì)列現(xiàn)在不是滿的了,喚醒生產(chǎn)者進(jìn)行生產(chǎn)
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
到了這里,關(guān)于阻塞隊(duì)列BlockingQueue的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!