目錄
?什么是阻塞隊列
?什么是生產(chǎn)-消費者模式
?實現(xiàn)一個阻塞隊列
?BlockingQueue
?什么是阻塞隊列
阻塞隊列是一種特殊的隊列,它除了具備隊列的先進先出的特點外,還具有以下特點:
?.如果隊列為空時,執(zhí)行出隊列操作,會阻塞等待,直到另一個線程往隊列里添加元素(隊列不為空)
?.如果隊列滿了時,執(zhí)行入隊列操作,會阻塞等待,直到另一個線程取出隊列里的元素(隊列沒有滿)
阻塞隊列常用于實現(xiàn)生產(chǎn)者-消費者模式,通過控制隊列的阻塞,使得生產(chǎn)者和消費者的協(xié)作更加有效和高效。下面我們先簡單認識下生產(chǎn)者-消費者模式~
?什么是生產(chǎn)-消費者模式
生產(chǎn)者消費者模式是一種常見的并發(fā)編程模式。該模式主要針對一個共享的緩沖區(qū),多個生產(chǎn)者可以向緩沖區(qū)中添加數(shù)據(jù),多個消費者可以從緩沖區(qū)中獲取數(shù)據(jù)。在該模式中,生產(chǎn)者向緩沖區(qū)中添加數(shù)據(jù),如果緩沖區(qū)已滿,則生產(chǎn)者需要等待消費者消費完一部分數(shù)據(jù)后再向緩沖區(qū)添加數(shù)據(jù);消費者從緩沖區(qū)中獲取數(shù)據(jù),如果緩沖區(qū)為空,則消費者需要等待生產(chǎn)者生產(chǎn)數(shù)據(jù)后再從緩沖區(qū)中獲取數(shù)據(jù)。生產(chǎn)者和消費者之間通過緩沖區(qū)進行通信和協(xié)調(diào),而緩沖區(qū)就可以由阻塞隊列構(gòu)成。
一個典型的生產(chǎn)者-消費者模式的例子是吃酒席的時候:后廚(生產(chǎn)者)端菜到餐桌(阻塞隊列)上,賓客(消費者)從餐桌(阻塞隊列)上取走事物食用,當餐桌放滿了食物,后廚就要等賓客吃完一碟再上,當餐桌上沒有食物,賓客就得等后廚上菜后再吃。
使用生產(chǎn)者-消費者模式有以下兩點好處:
?.有效降低生產(chǎn)者和消費者之間的耦合:生產(chǎn)者和消費者之間通過阻塞隊列進行交互,彼此之間沒有直接關(guān)聯(lián)
?.平衡生產(chǎn)者和消費者之間的速度差異:由于阻塞隊列的特性,平衡了兩者的速度差異,保證了系統(tǒng)的穩(wěn)定性
?實現(xiàn)一個阻塞隊列
我們先通過數(shù)組,實現(xiàn)一個普通的循環(huán)隊列:
public class MyBlockingQueue { private int[] items; private int head; private int tail; private int size; public MyBlockingQueue() { items = new int[1000]; head = 0; tail = 0; //隊列中元素的有效個數(shù) size = 0; } //入隊列 public void put(int value) { //隊列滿了 if (size == items.length) { return; } //隊列沒滿 items[tail] = value; tail = (tail + 1) % items.length; size++; } //出隊列 public int take() { //隊列為空 if (size == 0) { return -1; } //隊列不為空 int value = items[head]; head = (head + 1) % items.length; size--; return value; } }
再對隊列填加阻塞功能,并保證線程安全:
public class MyBlockingQueue { private int[] items; private int head; private int tail; private volatile int size; public MyBlockingQueue() { items = new int[1000]; head = 0; tail = 0; //隊列中元素的有效個數(shù) size = 0; } //入隊列 public void put(int value) throws InterruptedException { synchronized (this) { //隊列滿了 // 此處最好使用 while,否則 notifyAll 的時候, 該線程從 wait 中被喚醒 // 但是緊接著并未搶占到鎖. 當鎖被搶占的時候, 可能又已經(jīng)隊列滿了,就只能繼續(xù)等待 while (size == items.length) { wait(); } //隊列沒滿 items[tail] = value; tail = (tail + 1) % items.length; size++; notifyAll(); } } //出隊列 public int take() throws InterruptedException { int value = 0; synchronized (this) { //隊列為空 // 此處最好使用 while,否則 notifyAll 的時候, 該線程從 wait 中被喚醒 // 但是緊接著并未搶占到鎖. 當鎖被搶占的時候, 可能又已經(jīng)隊列滿了,就只能繼續(xù)等待 while (size == 0) { wait(); } //隊列不為空 value = items[head]; head = (head + 1) % items.length; size--; notifyAll(); } return value; } }
?BlockingQueue
?.BlockingQueue是concurrent包下的一個接口,是Java標準庫中內(nèi)置的阻塞隊列。
?.BlockingQueue除了保留Queue的offer、poll、peek方法外,還提供了帶有阻塞功能的入隊列方法:put()和帶有阻塞功能的出隊列方法:take()。
?.BlockingQueue的實現(xiàn)類有:
①.LinkedBlockingQueue:基于鏈表實現(xiàn)的阻塞隊列。
②.PriorityBlockingQueue:基于優(yōu)先級隊列(堆)實現(xiàn)的阻塞隊列。③.ArrayBlockingQueue:基于數(shù)組實現(xiàn)的阻塞隊列。
知道了BlockingQueue的基本信息,接下來我們來使用BlockingQueue實現(xiàn)一個生產(chǎn)者-消費者模式:
class Test { public static void main(String[] args) throws InterruptedException { BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(); Thread customer = new Thread(()->{ while (true) { try { int value = blockingQueue.take(); System.out.println("消費者:" + value); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); Thread producer = new Thread(()->{ Random random = new Random(); while (true) { try { int value = random.nextInt(1000); blockingQueue.put(value); System.out.println("生產(chǎn)者:" + value); Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); customer.start(); producer.start(); customer.join(); producer.join(); } }
由于在producer線程中sleep(100),所以producer線程入隊列會很慢,producer線程一入隊列就被customer線程取出:
文章來源:http://www.zghlxwxcb.cn/news/detail-717239.html
注:由于兩個線程的輸出語句是并行執(zhí)行,故先打印生產(chǎn)者還是消費者是不確定的文章來源地址http://www.zghlxwxcb.cn/news/detail-717239.html
到了這里,關(guān)于阻塞隊列.的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!