国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

多線程與高并發(fā)--------阻塞隊列

這篇具有很好參考價值的文章主要介紹了多線程與高并發(fā)--------阻塞隊列。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

四、阻塞隊列

一、基礎(chǔ)概念

1.1 生產(chǎn)者消費者概念

生產(chǎn)者消費者是設(shè)計模式的一種。讓生產(chǎn)者和消費者基于一個容器來解決強耦合問題。

生產(chǎn)者 消費者彼此之間不會直接通訊的,而是通過一個容器(隊列)進行通訊。

所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)后扔到容器中,不通用等待消費者來處理。

消費者不需要去找生產(chǎn)者要數(shù)據(jù),直接從容器中獲取即可。

而這種容器最常用的結(jié)構(gòu)就是隊列。

1.2 JUC阻塞隊列的存取方法

常用的存取方法都是來自于JUC包下的BlockingQueue

生產(chǎn)者存儲方法

add(E)     	// 添加數(shù)據(jù)到隊列,如果隊列滿了,無法存儲,拋出異常
offer(E)    // 添加數(shù)據(jù)到隊列,如果隊列滿了,返回false
offer(E,timeout,unit)   // 添加數(shù)據(jù)到隊列,如果隊列滿了,阻塞timeout時間,如果阻塞一段時間,依然沒添加進入,返回false
put(E)      // 添加數(shù)據(jù)到隊列,如果隊列滿了,掛起線程,等到隊列中有位置,再扔數(shù)據(jù)進去,死等!

消費者取數(shù)據(jù)方法

remove()    // 從隊列中移除數(shù)據(jù),如果隊列為空,拋出異常
poll()      // 從隊列中移除數(shù)據(jù),如果隊列為空,返回null,么的數(shù)據(jù)
poll(timeout,unit)   // 從隊列中移除數(shù)據(jù),如果隊列為空,掛起線程timeout時間,等生產(chǎn)者扔數(shù)據(jù),再獲取
take()     // 從隊列中移除數(shù)據(jù),如果隊列為空,線程掛起,一直等到生產(chǎn)者扔數(shù)據(jù),再獲取

二、ArrayBlockingQueue

2.1 ArrayBlockingQueue的基本使用

ArrayBlockingQueue在初始化的時候,必須指定當前隊列的長度。

因為ArrayBlockingQueue是基于數(shù)組實現(xiàn)的隊列結(jié)構(gòu),數(shù)組長度不可變,必須提前設(shè)置數(shù)組長度信息。

public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
    // 必須設(shè)置隊列的長度
    ArrayBlockingQueue queue = new ArrayBlockingQueue(4);

    // 生產(chǎn)者扔數(shù)據(jù)
    queue.add("1");
    queue.offer("2");
    queue.offer("3",2,TimeUnit.SECONDS);
    queue.put("2");

    // 消費者取數(shù)據(jù)
    System.out.println(queue.remove());
    System.out.println(queue.poll());
    System.out.println(queue.poll(2,TimeUnit.SECONDS));
    System.out.println(queue.take());
}

2.2 生產(chǎn)者方法實現(xiàn)原理

生產(chǎn)者添加數(shù)據(jù)到隊列的方法比較多,需要一個一個查看

2.2.1 ArrayBlockingQueue的常見屬性

ArrayBlockingQueue中的成員變量

lock = 就是一個ReentrantLock
count = 就是當前數(shù)組中元素的個數(shù)
iterms = 就是數(shù)組本身
# 基于putIndex和takeIndex將數(shù)組結(jié)構(gòu)實現(xiàn)為了隊列結(jié)構(gòu)
putIndex = 存儲數(shù)據(jù)時的下標
takeIndex = 去數(shù)據(jù)時的下標
notEmpty = 消費者掛起線程和喚醒線程用到的Condition(看成sync的wait和notify)
notFull = 生產(chǎn)者掛起線程和喚醒線程用到的Condition(看成sync的wait和notify)
2.2.2 add方法實現(xiàn)

add方法本身就是調(diào)用了offer方法,如果offer方法返回false,直接拋出異常

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        // 拋出的異常
        throw new IllegalStateException("Queue full");
}
2.2.3 offer方法實現(xiàn)
public boolean offer(E e) {
    // 要求存儲的數(shù)據(jù)不允許為null,為null就拋出空指針
    checkNotNull(e);
    // 當前阻塞隊列的lock鎖
    final ReentrantLock lock = this.lock;
    // 為了保證線程安全,加鎖
    lock.lock();
    try {
        // 如果隊列中的元素已經(jīng)存滿了,
        if (count == items.length)
            // 返回false
            return false;
        else {
            // 隊列沒滿,執(zhí)行enqueue將元素添加到隊列中
            enqueue(e);
            // 返回true
            return true;
        }
    } finally {
        // 操作完釋放鎖
        lock.unlock();
    }
}

//==========================================================
private void enqueue(E x) {
    // 拿到數(shù)組的引用
    final Object[] items = this.items;
    // 將元素放到指定位置
    items[putIndex] = x;
    // 對inputIndex進行++操作,并且判斷是否已經(jīng)等于數(shù)組長度,需要歸位
    if (++putIndex == items.length)
        // 將索引設(shè)置為0
        putIndex = 0;
    // 元素添加成功,進行++操作。
    count++;
    // 將一個Condition中阻塞的線程喚醒。
    notEmpty.signal();
}
2.2.4 offer(time,unit)方法

生產(chǎn)者在添加數(shù)據(jù)時,如果隊列已經(jīng)滿了,阻塞一會。

  • 阻塞到消費者消費了消息,然后喚醒當前阻塞線程
  • 阻塞到了time時間,再次判斷是否可以添加,不能,直接告辭。
// 如果線程在掛起的時候,如果對當前阻塞線程的中斷標記位進行設(shè)置,此時會拋出異常直接結(jié)束
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
	// 非空檢驗
    checkNotNull(e);
    // 將時間單位轉(zhuǎn)換為納秒
    long nanos = unit.toNanos(timeout);
    // 加鎖
    final ReentrantLock lock = this.lock;
    // 允許線程中斷并排除異常的加鎖方式
    lock.lockInterruptibly();
    try {
        // 為什么是while(虛假喚醒)
        // 如果元素個數(shù)和數(shù)組長度一致,隊列慢了
        while (count == items.length) {
            // 判斷等待的時間是否還充裕
            if (nanos <= 0)
                // 不充裕,直接添加失敗
                return false;
            // 掛起等待,會同時釋放鎖資源(對標sync的wait方法)
            // awaitNanos會掛起線程,并且返回剩余的阻塞時間
            // 恢復執(zhí)行時,需要重新獲取鎖資源
            nanos = notFull.awaitNanos(nanos);
        }
        // 說明隊列有空間了,enqueue將數(shù)據(jù)扔到阻塞隊列中
        enqueue(e);
        return true;
    } finally {
        // 釋放鎖資源
        lock.unlock();
    }
}
2.2.5 put方法

如果隊列是滿的, 就一直掛起,直到被喚醒,或者被中斷

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // await方法一直阻塞,直到被喚醒或者中斷標記位
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

2.3 消費者方法實現(xiàn)原理

2.3.1 remove方法
// remove方法就是調(diào)用了poll
public E remove() {
    E x = poll();
    // 如果有數(shù)據(jù),直接返回
    if (x != null)
        return x;
    // 沒數(shù)據(jù)拋出異常
    else
        throw new NoSuchElementException();
}
2.4.2 poll方法
// 拉取數(shù)據(jù)
public E poll() {
    // 加鎖操作
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果沒有數(shù)據(jù),直接返回null,如果有數(shù)據(jù),執(zhí)行dequeue,取出數(shù)據(jù)并返回
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

//==========================================================
// 取出數(shù)據(jù)
private E dequeue() {
    // 將成員變量引用到局部變量
    final Object[] items = this.items;
    // 直接獲取指定索引位置的數(shù)據(jù)
    E x = (E) items[takeIndex];
    // 將數(shù)組上指定索引位置設(shè)置為null
    items[takeIndex] = null;
    // 設(shè)置下次取數(shù)據(jù)時的索引位置
    if (++takeIndex == items.length)
        takeIndex = 0;
    // 對count進行--操作
    count--;
    // 迭代器內(nèi)容,先跳過
    if (itrs != null)
        itrs.elementDequeued();
    // signal方法,會喚醒當前Condition中排隊的一個Node。
    // signalAll方法,會將Condition中所有的Node,全都喚醒
    notFull.signal();
    // 返回數(shù)據(jù)。
    return x;
}
2.4.3 poll(time,unit)方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 轉(zhuǎn)換時間單位
    long nanos = unit.toNanos(timeout);
    // 競爭鎖
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 如果沒有數(shù)據(jù)
        while (count == 0) {
            if (nanos <= 0)
                // 沒數(shù)據(jù),也無法阻塞了,返回null
                return null;
            // 沒數(shù)據(jù),掛起消費者線程
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 取數(shù)據(jù)
        return dequeue();
    } finally {
        lock.unlock();
    }
}
2.4.4 take方法
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 虛假喚醒
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
2.4.5 虛假喚醒

阻塞隊列中,如果需要線程掛起操作,判斷有無數(shù)據(jù)的位置采用的是while循環(huán) ,為什么不能換成if

肯定是不能換成if邏輯判斷

線程A,線程B,線程E,線程C。 其中ABE生產(chǎn)者,C屬于消費者

假如線程的隊列是滿的

// E,拿到鎖資源,還沒有走while判斷
while (count == items.length)
    // A醒了
    // B掛起
    notFull.await();
enqueue(e);

C此時消費一條數(shù)據(jù),執(zhí)行notFull.signal()喚醒一個線程,A線程被喚醒

E走判斷,發(fā)現(xiàn)有空余位置,可以添加數(shù)據(jù)到隊列,E添加數(shù)據(jù),走enqueue

如果判斷是if,A在E釋放鎖資源后,拿到鎖資源,直接走enqueue方法。

此時A線程就是在putIndex的位置,覆蓋掉之前的數(shù)據(jù),造成數(shù)據(jù)安全問題

三、LinkedBlockingQueue

3.1 LinkedBlockingQueue的底層實現(xiàn)

查看LinkedBlockingQueue是如何存儲數(shù)據(jù),并且實現(xiàn)鏈表結(jié)構(gòu)的。

// Node對象就是存儲數(shù)據(jù)的單位
static class Node<E> {
    // 存儲的數(shù)據(jù)
    E item;
	// 指向下一個數(shù)據(jù)的指針
    Node<E> next;
	// 有參構(gòu)造
    Node(E x) { item = x; }
}

查看LinkedBlockingQueue的有參構(gòu)造

// 可以手動指定LinkedBlockingQueue的長度,如果沒有指定,默認為Integer.MAX_VALUE
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 在初始化時,構(gòu)建一個item為null的節(jié)點,作為head和last
	 // 這種node可以成為哨兵Node,
    // 如果沒有哨兵節(jié)點,那么在獲取數(shù)據(jù)時,需要判斷head是否為null,才能找next
    // 如果沒有哨兵節(jié)點,那么在添加數(shù)據(jù)時,需要判斷l(xiāng)ast是否為null,才能找next
    last = head = new Node<E>(null);
}

查看LinkedBlockingQueue的其他屬性

// 因為是鏈表,沒有想數(shù)組的length屬性,基于AtomicInteger來記錄長度
private final AtomicInteger count = new AtomicInteger();
// 鏈表的頭,取
transient Node<E> head;
// 鏈表的尾,存
private transient Node<E> last;
// 消費者的鎖
private final ReentrantLock takeLock = new ReentrantLock();
// 消費者的掛起操作,以及喚醒用的condition
private final Condition notEmpty = takeLock.newCondition();
// 生產(chǎn)者的鎖
private final ReentrantLock putLock = new ReentrantLock();
// 生產(chǎn)者的掛起操作,以及喚醒用的condition
private final Condition notFull = putLock.newCondition();

3.2 生產(chǎn)者方法實現(xiàn)原理

3.2.1 add方法

你懂得,還是走offer方法

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
3.2.2 offer方法
public boolean offer(E e) {
    // 非空校驗
    if (e == null) throw new NullPointerException();
    // 拿到存儲數(shù)據(jù)條數(shù)的count
    final AtomicInteger count = this.count;
    // 查看當前數(shù)據(jù)條數(shù),是否等于隊列限制長度,達到了這個長度,直接返回false
    if (count.get() == capacity)
        return false;
    // 聲明c,作為標記存在
    int c = -1;
    // 將存儲的數(shù)據(jù)封裝為Node對象
    Node<E> node = new Node<E>(e);
    // 獲取生產(chǎn)者的鎖。
    final ReentrantLock putLock = this.putLock;
    // 競爭鎖資源
    putLock.lock();
    try {
        // 再次做一個判斷,查看是否還有空間
        if (count.get() < capacity) {
            // enqueue,扔數(shù)據(jù)
            enqueue(node);
            // 將數(shù)據(jù)個數(shù) + 1
            c = count.getAndIncrement();
            // 拿到count的值 小于 長度限制
            // 有生產(chǎn)者在基于await掛起,這里添加完數(shù)據(jù)后,發(fā)現(xiàn)還有空間可以存儲數(shù)據(jù),
            // 喚醒前面可能已經(jīng)掛起的生產(chǎn)者
            // 因為這里生產(chǎn)者和消費者不是互斥的,寫操作進行的同時,可能也有消費者在消費數(shù)據(jù)。
            if (c + 1 < capacity)
                // 喚醒生產(chǎn)者
                notFull.signal();
        }
    } finally {
        // 釋放鎖資源
        putLock.unlock();
    }
    // 如果c == 0,代表添加數(shù)據(jù)之前,隊列元素個數(shù)是0個。
    // 如果有消費者在隊列沒有數(shù)據(jù)的時候,來消費,此時消費者一定會掛起線程
    if (c == 0)
        // 喚醒消費者
        signalNotEmpty();
    // 添加成功返回true,失敗返回-1
    return c >= 0;
}

//================================================
private void enqueue(Node<E> node) {
    // 將當前Node設(shè)置為last的next,并且再將當前Node作為last
    last = last.next = node;
}
//================================================
private void signalNotEmpty() {
    // 獲取讀鎖
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 喚醒。
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}
sync -> wait / notify
3.2.3 offer(time,unit)方法
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
	 // 非空檢驗
    if (e == null) throw new NullPointerException();
    // 將時間轉(zhuǎn)換為納秒
    long nanos = unit.toNanos(timeout);
    // 標記
    int c = -1;
    // 寫鎖,數(shù)據(jù)條數(shù)
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 允許中斷的加鎖方式
    putLock.lockInterruptibly();
    try {
        // 如果元素個數(shù)和限制個數(shù)一致,直接準備掛起
        while (count.get() == capacity) {
            // 掛起的時間是不是已經(jīng)沒了
            if (nanos <= 0)
                // 添加失敗,返回false
                return false;
            // 掛起線程
            nanos = notFull.awaitNanos(nanos);
        }
        // 有空余位置,enqueue添加數(shù)據(jù)
        enqueue(new Node<E>(e));
        // 元素個數(shù) + 1
        c = count.getAndIncrement();
        // 當前添加完數(shù)據(jù),還有位置可以添加數(shù)據(jù),喚醒可能阻塞的生產(chǎn)者
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 釋放鎖
        putLock.unlock();
    }
    // 如果之前元素個數(shù)是0,喚醒可能等待的消費者
    if (c == 0)
        signalNotEmpty();
    return true;
}
3.2.4 put方法
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            // 一直掛起線程,等待被喚醒
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

3.3 消費者方法實現(xiàn)原理

從remove方法開始,查看消費者獲取數(shù)據(jù)的方式

3.3.1 remove方法
public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
3.3.2 poll方法
public E poll() {
    // 拿到隊列數(shù)據(jù)個數(shù)的計數(shù)器
    final AtomicInteger count = this.count;
    // 當前隊列中數(shù)據(jù)是否0
    if (count.get() == 0)
        // 說明隊列沒數(shù)據(jù),直接返回null即可
        return null;
    // 聲明返回結(jié)果
    E x = null;
    // 標記
    int c = -1;
    // 獲取消費者的takeLock
    final ReentrantLock takeLock = this.takeLock;
    // 加鎖
    takeLock.lock();
    try {
        // 基于DCL,確保當前隊列中依然有元素
        if (count.get() > 0) {
            // 從隊列中移除數(shù)據(jù)
            x = dequeue();
            // 將之前的元素個數(shù)獲取,并--
            c = count.getAndDecrement();
            if (c > 1)
                // 如果依然有數(shù)據(jù),繼續(xù)喚醒await的消費者。
                notEmpty.signal();
        }
    } finally {
        // 釋放鎖資源
        takeLock.unlock();
    }
    // 如果之前的元素個數(shù)為當前隊列的限制長度,
    // 現(xiàn)在消費者消費了一個數(shù)據(jù),多了一個空位可以添加
    if (c == capacity)
        // 喚醒阻塞的生產(chǎn)者
        signalNotFull();
    return x;
}

//================================================

private E dequeue() {
    // 拿到隊列的head位置數(shù)據(jù)
    Node<E> h = head;
    // 拿到了head的next,因為這個是哨兵Node,需要拿到的head.next的數(shù)據(jù)
    Node<E> first = h.next;
    // 將之前的哨兵Node.next置位null。help GC。
    h.next = h; 
    // 將first置位新的head
    head = first;
    // 拿到返回結(jié)果first節(jié)點的item數(shù)據(jù),也就是之前head.next.item
    E x = first.item;
    // 將first數(shù)據(jù)置位null,作為新的head
    first.item = null;
    // 返回數(shù)據(jù)
    return x;
}

//================================================

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // 喚醒生產(chǎn)者。
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}
3.3.3 poll(time,unit)方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 返回結(jié)果
    E x = null;
    // 標識
    int c = -1;
    // 將掛起實現(xiàn)設(shè)置為納秒級別
    long nanos = unit.toNanos(timeout);
    // 拿到計數(shù)器
    final AtomicInteger count = this.count;
    // take鎖加鎖
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 如果沒數(shù)據(jù),進到while
        while (count.get() == 0) {
            if (nanos <= 0)
                return null;
            // 掛起當前線程
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 剩下內(nèi)容,和之前一樣。
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
} 
3.3.4 take方法
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 相比poll(time,unit)方法,這里的出口只有一個,就是中斷標記位,拋出異常,否則一直等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

四、PriorityBlockingQueue概念

4.1 PriorityBlockingQueue介紹

首先PriorityBlockingQueue是一個優(yōu)先級隊列,他不滿足先進先出的概念。

會將查詢的數(shù)據(jù)進行排序,排序的方式就是基于插入數(shù)據(jù)值的本身。

如果是自定義對象必須要實現(xiàn)Comparable接口才可以添加到優(yōu)先級隊列

排序的方式是基于二叉堆實現(xiàn)的。底層是采用數(shù)據(jù)結(jié)構(gòu)實現(xiàn)的二叉堆。

4.2 二叉堆結(jié)構(gòu)介紹

優(yōu)先級隊列PriorityBlockingQueue基于二叉堆實現(xiàn)的。

private transient Object[] queue;

PriorityBlockingQueue是基于數(shù)組實現(xiàn)的二叉堆。

二叉堆是什么?

  • 二叉堆就是一個完整的二叉樹。
  • 任意一個節(jié)點大于父節(jié)點或者小于父節(jié)點
  • 基于同步的方式,可以定義出小頂堆和大頂堆

小頂堆以及小頂堆基于數(shù)據(jù)實現(xiàn)的方式。

多線程與高并發(fā)--------阻塞隊列,多線程與高并發(fā),java

4.3 PriorityBlockingQueue核心屬性

// 數(shù)組的初始長度
private static final int DEFAULT_INITIAL_CAPACITY = 11;

// 數(shù)組的最大長度
// -8的目的是為了適配各個版本的虛擬機
// 默認當前使用的hotspot虛擬機最大支持Integer.MAX_VALUE - 2,但是其他版本的虛擬機不一定。
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 存儲數(shù)據(jù)的數(shù)組,也是基于這個數(shù)組實現(xiàn)的二叉堆。
private transient Object[] queue;

// size記錄當前阻塞隊列中元素的個數(shù)
private transient int size;

// 要求使用的對象要實現(xiàn)Comparable比較器?;赾omparator做對象之間的比較
private transient Comparator<? super E> comparator;

// 實現(xiàn)阻塞隊列的lock鎖
private final ReentrantLock lock;

// 掛起線程操作。
private final Condition notEmpty;

// 因為PriorityBlockingQueue的底層是基于二叉堆的,而二叉堆又是基于數(shù)組實現(xiàn)的,數(shù)組長度是固定的,如果需要擴容,需要構(gòu)建一個新數(shù)組。PriorityBlockingQueue在做擴容操作時,不會lock住的,釋放lock鎖,基于allocationSpinLock屬性做標記,來避免出現(xiàn)并發(fā)擴容的問題。
private transient volatile int allocationSpinLock;

// 阻塞隊列中用到的原理,其實就是普通的優(yōu)先級隊列。
private PriorityQueue<E> q;

4.4 PriorityBlockingQueue的寫入操作

畢竟是阻塞隊列,添加數(shù)據(jù)的操作,咱們是很了解,還是add,offer,offer(time,unit),put。但是因為優(yōu)先級隊列中,數(shù)組是可以擴容的,雖然有長度限制,但是依然屬于無界隊列的概念,所以生產(chǎn)者不會阻塞,所以只有offer方法可以查看。

這次核心的內(nèi)容并不是添加數(shù)據(jù)的區(qū)別。主要關(guān)注的是如何保證二叉堆中小頂堆的結(jié)構(gòu)的,并且還要查看數(shù)組擴容的一個過程是怎樣的。

4.4.1 offer基本流程

因為add方法依然調(diào)用的是offer方法,直接查看offer方法即可

public boolean offer(E e) {
    // 非空判斷。
    if (e == null)
        throw new NullPointerException();
    // 拿到鎖,直接上鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    // n:size,元素的個數(shù)
    // cap:當前數(shù)組的長度
    // array:就是存儲數(shù)據(jù)的數(shù)組
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
        // 如果元素個數(shù)大于等于數(shù)組的長度,需要嘗試擴容。
        tryGrow(array, cap);
    try {
        // 拿到了比較器
        Comparator<? super E> cmp = comparator;
        // 比較數(shù)據(jù)大小,存儲數(shù)據(jù),是否需要做上移操作,保證平衡的
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        // 元素個數(shù) + 1
        size = n + 1;
        // 如果有掛起的線程,需要去喚醒掛起的消費者。
        notEmpty.signal();
    } finally {
        // 釋放鎖
        lock.unlock();
    }
    // 返回true
    return true;
}
4.4.2 offer擴容操作

在添加數(shù)據(jù)之前,會采用while循環(huán)的方式,來判斷當前元素個數(shù)是否大于等于數(shù)組長度。如果滿足,需要執(zhí)行tryGrow方法,對數(shù)組進行擴容

如果兩個線程同時執(zhí)行tryGrow,只會有一個線程在擴容,另一個線程可能多次走while循環(huán),多次走tryGrow方法,但是依然需要等待前面的線程擴容完畢。

private void tryGrow(Object[] array, int oldCap) {
    // 釋放鎖資源。
    lock.unlock(); 
    // 聲明新數(shù)組。
    Object[] newArray = null;
    // 如果allocationSpinLock屬性值為0,說明當前沒有線程正在擴容的。
    if (allocationSpinLock == 0 &&
        // 基于CAS的方式,將allocationSpinLock從0修改為1,代表當前線程可以開始擴容
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {
        try {
            // 計算新數(shù)組長度
            int newCap = oldCap + ((oldCap < 64) ?
                                   // 如果數(shù)組長度比較小,這里加快擴容長度速度。
                                   (oldCap + 2) : 
                                   // 如果長度大于等于64了,每次擴容到1.5倍即可。
                                   (oldCap >> 1));
            // 如果新數(shù)組長度大于MAX_ARRAY_SIZE,需要做點事了。
            if (newCap - MAX_ARRAY_SIZE > 0) {   
                // 聲明minCap,長度為老數(shù)組 + 1
                int minCap = oldCap + 1;
                // 老數(shù)組+1變?yōu)樨摂?shù),或者老數(shù)組長度已經(jīng)大于MAX_ARRAY_SIZE了,無法擴容了。
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    // 告辭,涼涼~~~~
                    throw new OutOfMemoryError();
                // 如果沒有超過限制,直接設(shè)置為最大長度即可
                newCap = MAX_ARRAY_SIZE;
            }
            // 新數(shù)組長度,得大于老數(shù)組長度,
            // 第二個判斷確保沒有并發(fā)擴容的出現(xiàn)。
            if (newCap > oldCap && queue == array)
                // 構(gòu)建出新數(shù)組
                newArray = new Object[newCap];
        } finally {
            // 新數(shù)組有了,標記位歸0~~
            allocationSpinLock = 0;
        }
    }
    // 如果到了這,newArray依然為null,說明這個線程沒有進到if方法中,去構(gòu)建新數(shù)組
    if (newArray == null) 
        // 稍微等一手。
        Thread.yield();
    // 拿鎖資源,
    lock.lock();
    // 拿到鎖資源后,確認是構(gòu)建了新數(shù)組的線程,這里就需要將新數(shù)組復制給queue,并且導入數(shù)據(jù)
    if (newArray != null && queue == array) {
        // 將新數(shù)組賦值給queue
        queue = newArray;
        // 將老數(shù)組的數(shù)據(jù)全部導入到新數(shù)組中。
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}
4.4.3 offer添加數(shù)據(jù)-上移平衡

這里是數(shù)據(jù)如何放到數(shù)組上,并且如何保證的二叉堆結(jié)構(gòu)

// k:當前元素的個數(shù)(其實就是要放的索引位置)
// x:需要添加的數(shù)據(jù)
// array:數(shù)組。。
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    // 將插入的元素直接強轉(zhuǎn)為Comparable(com.mashibing.User cannot be cast to java.lang.Comparable)
    // 這行強轉(zhuǎn),會導致添加沒有實現(xiàn)Comparable的元素,直接報錯。
    Comparable<? super T> key = (Comparable<? super T>) x;
    // k大于0,走while邏輯。(原來有數(shù)據(jù))
    while (k > 0) {
        // 獲取父節(jié)點的索引位置。
        int parent = (k - 1) >>> 1;
        // 拿到父節(jié)點的元素。
        Object e = array[parent];
        // 用子節(jié)點compareTo父節(jié)點,如果 >= 0,說明當前son節(jié)點比parent要大。
        if (key.compareTo((T) e) >= 0)
            // 直接break,完事,
            break;
        // 將son節(jié)點的位置設(shè)置上之前的parent節(jié)點
        array[k] = e;
        // 重新設(shè)置x節(jié)點需要放置的位置。
        k = parent;
    }
    // k == 0,當前元素是第一個元素,直接插入進去。
    array[k] = key;
}

4.5 PriorityBlockingQueue的讀取操作

讀取操作是存儲現(xiàn)在掛起的情況的,因為如果數(shù)組中元素個數(shù)為0,當前線程如果執(zhí)行了take方法,必然需要掛起。

其次獲取數(shù)據(jù),因為是優(yōu)先級隊列,所以需要從二叉堆棧頂拿數(shù)據(jù),直接拿索引為0的數(shù)據(jù)即可,但是拿完之后,需要保持二叉堆結(jié)構(gòu),所以會有下移操作。

4.5.1 查看獲取方法流程

poll:

public E poll() {
    final ReentrantLock lock = this.lock;
    // 加鎖
    lock.lock();
    try {
        // 拿到返回數(shù)據(jù),沒拿到,返回null
        return dequeue();
    } finally {
        lock.unlock();
    }
}

poll(time,unit):

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 將掛起的時間轉(zhuǎn)換為納秒
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 允許線程中斷拋異常的加鎖
    lock.lockInterruptibly();
    // 聲明結(jié)果
    E result;
    try {
        // dequeue是去拿數(shù)據(jù)的,可能會出現(xiàn)拿到的數(shù)據(jù)為null,如果為null,同時掛起時間還有剩余,這邊就直接通過notEmpty掛起線程
        while ( (result = dequeue()) == null && nanos > 0)
            nanos = notEmpty.awaitNanos(nanos);
    } finally {
        lock.unlock();
    }
    // 有數(shù)據(jù)正常返回,沒數(shù)據(jù),告辭~
    return result;
}

take:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            // 無線等,要么有數(shù)據(jù),要么中斷線程
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}
4.5.2 查看dequeue獲取數(shù)據(jù)

獲取數(shù)據(jù)主要就是從數(shù)組中拿到0索引位置數(shù)據(jù),然后保持二叉堆結(jié)構(gòu)

private E dequeue() {
    // 將元素個數(shù)-1,拿到了索引位置。
    int n = size - 1;
    // 判斷是不是木有數(shù)據(jù)了,沒數(shù)據(jù)直接返回null即可
    if (n < 0)
        return null;
    // 說明有數(shù)據(jù)
    else {
        // 拿到數(shù)組,array
        Object[] array = queue;
        // 拿到0索引位置的數(shù)據(jù)
        E result = (E) array[0];
        // 拿到最后一個數(shù)據(jù)
        E x = (E) array[n];
        // 將最后一個位置置位null
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        // 元素個數(shù)-1,賦值size
        size = n;
        // 返回result
        return result;
    }
}
4.6.3 下移做平衡操作

一定要以局部的方式去查看樹結(jié)構(gòu)的變化,他是從跟節(jié)點往下找較小的一個子節(jié)點,將較小的子節(jié)點挪動到父節(jié)點位置,再將循環(huán)往下走,如果一來,整個二叉堆的結(jié)構(gòu)就可以保證了。

// k:默認進來是0
// x:代表二叉堆的最后一個數(shù)據(jù)
// array:數(shù)組
// n:最后一個索引
private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {
    // 健壯性校驗,取完第一個數(shù)據(jù),已經(jīng)沒數(shù)據(jù)了,那就不需要做平衡操作
    if (n > 0) {
        // 拿到最后一個數(shù)據(jù)的比較器
        Comparable<? super T> key = (Comparable<? super T>)x;
        // 因為二叉堆是一個二叉滿樹,所以在保證二叉堆結(jié)構(gòu)時,只需要做一半就可以
        int half = n >>> 1; 
        // 做了超過一半,就不需要再往下找了。
        while (k < half) {
            // 找左子節(jié)點索引,一個公式,可以找到當前節(jié)點的左子節(jié)點
            int child = (k << 1) + 1; 
            // 拿到左子節(jié)點的數(shù)據(jù)
            Object c = array[child];
            // 拿到右子節(jié)點索引
            int right = child + 1;
            // 確認有右子節(jié)點
            // 判斷左節(jié)點是否大于右節(jié)點
            if (right < n && c.compareTo(array[right]) > 0)
                // 如果左大于右,那么c就執(zhí)行右
                c = array[child = right];
            // 比較最后一個節(jié)點是否小于當前的較小的子節(jié)點
            if (key.compareTo((T) c) <= 0)
                break;
            // 將左右子節(jié)點較小的放到之前的父節(jié)點位置
            array[k] = c;
            // k重置到之前的子節(jié)點位置
            k = child;
        }
        // 上面while循環(huán)搞定后,可以確認整個二叉堆中,數(shù)據(jù)已經(jīng)移動ok了,只差當前k的位置數(shù)據(jù)是null
        // 將最后一個索引的數(shù)據(jù)放到k的位置
        array[k] = key;
    }
}

五、DelayQueue

5.1 DelayQueue介紹&應(yīng)用

DelayQueue就是一個延遲隊列,生產(chǎn)者寫入一個消息,這個消息還有直接被消費的延遲時間。

需要讓消息具有延遲的特性。

DelayQueue也是基于二叉堆結(jié)構(gòu)實現(xiàn)的,甚至本事就是基于PriorityQueue實現(xiàn)的功能。二叉堆結(jié)構(gòu)每次獲取的是棧頂?shù)臄?shù)據(jù),需要讓DelayQueue中的數(shù)據(jù),在比較時,跟根據(jù)延遲時間做比較,剩余時間最短的要放在棧頂。

查看DelayQueue類信息:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
    // 發(fā)現(xiàn)DelayQueue中的元素,需要繼承Delayed接口。
}
// ==========================================
// 接口繼承了Comparable,這樣就具備了比較的能力。
public interface Delayed extends Comparable<Delayed> {
    // 抽象方法,就是咱們需要設(shè)置的延遲時間
    long getDelay(TimeUnit unit);
  
    // Comparable接口提供的:public int compareTo(T o);
}

基于上述特點,聲明一個可以寫入DelayQueue的元素類

public class Task implements Delayed {

    /** 任務(wù)的名稱 */
    private String name;

    /** 什么時間點執(zhí)行 */
    private Long time;

    /**
     *
     * @param name
     * @param delay  單位毫秒。
     */
    public Task(String name, Long delay) {
        // 任務(wù)名稱
        this.name = name;
        this.time = System.currentTimeMillis() + delay;
    }

    /**
     * 設(shè)置任務(wù)什么時候可以出延遲隊列
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
		// 單位是毫秒,視頻里寫錯了,寫成了納秒,
        return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    /**
     * 兩個任務(wù)在插入到延遲隊列時的比較方式
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.time - ((Task)o).getTime());
    }
}

在使用時,查看到DelayQueue底層用了PriorityQueue,在一定程度上,DelayQueue也是無界隊列。

測試效果

public static void main(String[] args) throws InterruptedException {
    // 聲明元素
    Task task1 = new Task("A",1000L);
    Task task2 = new Task("B",5000L);
    Task task3 = new Task("C",3000L);
    Task task4 = new Task("D",2000L);
    // 聲明阻塞隊列
    DelayQueue<Task> queue = new DelayQueue<>();
    // 將元素添加到延遲隊列中
    queue.put(task1);
    queue.put(task2);
    queue.put(task3);
    queue.put(task4);
    // 獲取元素
    System.out.println(queue.take());
    System.out.println(queue.take());
    System.out.println(queue.take());
    System.out.println(queue.take());
    // A,D,C,B
}

在應(yīng)用時,外賣,15分鐘商家需要節(jié)點,如果不節(jié)點,這個訂單自動取消。

可以每下一個訂單,就放到延遲隊列中,如果規(guī)定時間內(nèi),商家沒有節(jié)點,直接通過消費者獲取元素,然后取消訂單。

只要是有需要延遲一定時間后,再執(zhí)行的任務(wù),就可以通過延遲隊列去實現(xiàn)。

5.2、DelayQueue核心屬性

可以查看到DelayQueue就四個核心屬性

// 因為DelayQueue依然屬于阻塞隊列,需要保證線程安全。看到只有一把鎖,生產(chǎn)者和消費者使用的是一個lock
private final transient ReentrantLock lock = new ReentrantLock();
// 因為DelayQueue還是基于二叉堆結(jié)構(gòu)實現(xiàn)的,沒有必要重新搞一個二叉堆,直接使用的PriorityQueue
private final PriorityQueue<E> q = new PriorityQueue<E>();
// leader一般會存儲等待棧頂數(shù)據(jù)的消費者,在整體寫入和消費的過程中,會設(shè)置的leader的一些判斷。
private Thread leader = null;
// 生產(chǎn)者在插入數(shù)據(jù)時,不會阻塞的。當前的Condition就是給消費者用的
// 比如消費者在獲取數(shù)據(jù)時,發(fā)現(xiàn)棧頂?shù)臄?shù)據(jù)還又沒到延遲時間。
// 這個時候,咱們就需要將消費者線程掛起,阻塞一會,阻塞到元素到了延遲時間,或者是,生產(chǎn)者插入的元素到了棧頂,此時生產(chǎn)者會喚醒消費者。
private final Condition available = lock.newCondition();

5.3、DelayQueue寫入流程分析

Delay是無界的,數(shù)組可以動態(tài)的擴容,不需要關(guān)注生產(chǎn)者的阻塞問題,他就沒有阻塞問題。

這里只需要查看offer方法即可。

public boolean offer(E e) {
    // 直接獲取lock,加鎖。
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 直接調(diào)用PriorityQueue的插入方法,這里會根據(jù)之前重寫Delayed接口中的compareTo方法做排序,然后調(diào)整上移和下移操作。
        q.offer(e);
        // 調(diào)用優(yōu)先級隊列的peek方法,拿到堆頂?shù)臄?shù)據(jù)
        // 拿到堆頂數(shù)據(jù)后,判斷是否是剛剛插入的元素
        if (q.peek() == e) {
            // leader賦值為null。在消費者的位置再提一嘴
            leader = null;
            // 喚醒消費者,避免剛剛插入的數(shù)據(jù)的延遲時間出現(xiàn)問題。
            available.signal();
        }
        // 插入成功,
        return true;
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

5.4、DelayQueue讀取流程分析

消費者依然還是存在阻塞的情況,因為有兩個情況

  • 消費者要拿到棧頂數(shù)據(jù),但是延遲時間還沒到,此時消費者需要等待一會。
  • 消費者要來拿數(shù)據(jù),但是發(fā)現(xiàn)已經(jīng)有消費者在等待棧頂數(shù)據(jù)了,這個后來的消費者也需要等待一會。

依然需要查看四個方法的實現(xiàn)

5.4.1 remove方法
// 依然是AbstractQueue提供的方法,有結(jié)果就返回,沒結(jié)果扔異常
public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
5.4.2 poll方法
// poll是淺嘗一下,不會阻塞消費者,能拿就拿,拿不到就拉倒
public E poll() {
    // 消費者和生產(chǎn)者是一把鎖,先拿鎖,加鎖。
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
       	 // 拿到棧頂數(shù)據(jù)。
        E first = q.peek();
        // 如果元素為null,直接返回null
        // 如果getDelay方法返回的結(jié)果是大于0的,那說明當前元素還每到延遲時間,元素無法返回,返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            // 到這說明元素不為null,并且已經(jīng)達到了延遲時間,直接調(diào)用優(yōu)先級隊列的poll方法
            return q.poll();
    } finally {
        // 釋放鎖。
        lock.unlock();
    }
}
5.4.3 poll(time,unit)方法

這個是允許阻塞的,并且指定一定的時間

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 先將時間轉(zhuǎn)為納秒
    long nanos = unit.toNanos(timeout);
    // 拿鎖,加鎖。
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 死循環(huán)。
        for (;;) {
            // 拿到堆頂數(shù)據(jù)
            E first = q.peek();
            // 如果元素為null
            if (first == null) {
                // 并且等待的時間小于等于0。不能等了,直接返回null
                if (nanos <= 0)
                    return null;
                // 說明當前線程還有可以阻塞的時間,阻塞指定時間即可。
                else
                    // 這里掛起線程后,說明隊列沒有元素,在生產(chǎn)者添加數(shù)據(jù)之后,會喚醒
                    nanos = available.awaitNanos(nanos);
            // 到這說明,有數(shù)據(jù)
            } else {
                // 有數(shù)據(jù)的話,先獲取數(shù)據(jù)現(xiàn)在是否可以執(zhí)行,延遲時間是否已經(jīng)到了指定時間
                long delay = first.getDelay(NANOSECONDS);
                // 延遲時間是否已經(jīng)到了,
                if (delay <= 0)
                    // 時間到了,直接執(zhí)行優(yōu)先級隊列的poll方法,返回元素
                    return q.poll();
                // ==================延遲時間沒到,消費者需要等一會===================
                // 這個是查看消費者可以等待的時間,
                if (nanos <= 0)
                    // 直接返回nulll
                    return null;
                // ==================延遲時間沒到,消費者可以等一會===================
                // 把first賦值為null
                first = null; 
                // 如果等待的時間,小于元素剩余的延遲時間,消費者直接掛起。反正暫時拿不到,但是不能保證后續(xù)是否有生產(chǎn)者添加一個新的數(shù)據(jù),我是可以拿到的。
                // 如果已經(jīng)有一個消費者在等待堆頂數(shù)據(jù)了,我這邊不做額外操作,直接掛起即可。
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                // 當前消費者的阻塞時間可以拿到數(shù)據(jù),并且沒有其他消費者在等待堆頂數(shù)據(jù)
                else {
                    // 拿到當前消費者的線程對象
                    Thread thisThread = Thread.currentThread();
                    // 將leader設(shè)置為當前線程
                    leader = thisThread;
                    try {
                        // 會讓當前消費者,阻塞這個元素的延遲時間
                        long timeLeft = available.awaitNanos(delay);
                        // 重新計算當前消費者剩余的可阻塞時間,。
                        nanos -= delay - timeLeft;
                    } finally {
                        // 到了時間,將leader設(shè)置為null
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 沒有消費者在等待元素,隊列中的元素不為null
        if (leader == null && q.peek() != null)
            // 只要當前沒有l(wèi)eader在等,并且隊列有元素,就需要再次喚醒消費者。、
            // 避免隊列有元素,但是沒有消費者處理的問題
            available.signal();
        // 釋放鎖
        lock.unlock();
    }
}
5.4.4 take方法

這個是允許阻塞的,但是可以一直等,要么等到元素,要么等到被中斷。

public E take() throws InterruptedException {
    // 正常加鎖,并且允許中斷
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 拿到元素
            E first = q.peek();
            if (first == null)
                // 沒有元素掛起。
                available.await();
            else {
                // 有元素,獲取延遲時間。
                long delay = first.getDelay(NANOSECONDS);
                // 判斷延遲時間是不是已經(jīng)到了
                if (delay <= 0)
                    // 基于優(yōu)先級隊列的poll方法返回
                    return q.poll();
                first = null; 
                // 如果有消費者在等,就正常await掛起
                if (leader != null)
                    available.await();
                // 如果沒有消費者在等的堆頂數(shù)據(jù),我來等
                else {
                    // 獲取當前線程
                    Thread thisThread = Thread.currentThread();
                    // 設(shè)置為leader,代表等待堆頂?shù)臄?shù)據(jù)
                    leader = thisThread;
                    try {
                        // 等待指定(堆頂元素的延遲時間)時長,
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            // leader賦值null
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 避免消費者無限等,來一個喚醒消費者的方法,一般是其他消費者拿到元素走了之后,并且延遲隊列還有元素,就執(zhí)行if內(nèi)部喚醒方法
        if (leader == null && q.peek() != null)
            available.signal();
        // 釋放鎖
        lock.unlock();
    }
}

六、SynchronousQueue

6.1 SynchronousQueue介紹

SynchronousQueue這個阻塞隊列和其他的阻塞隊列有很大的區(qū)別

在咱們的概念中,隊列肯定是要存儲數(shù)據(jù)的,但是SynchronousQueue不會存儲數(shù)據(jù)的

SynchronousQueue隊列中,他不存儲數(shù)據(jù),存儲生產(chǎn)者或者是消費者

當存儲一個生產(chǎn)者到SynchronousQueue隊列中之后,生產(chǎn)者會阻塞(看你調(diào)用的方法)

生產(chǎn)者最終會有幾種結(jié)果:

  • 如果在阻塞期間有消費者來匹配,生產(chǎn)者就會將綁定的消息交給消費者
  • 生產(chǎn)者得等阻塞結(jié)果,或者不允許阻塞,那么就直接失敗
  • 生產(chǎn)者在阻塞期間,如果線程中斷,直接告辭。

同理,消費者和生產(chǎn)者的效果是一樣。

生產(chǎn)者和消費者的數(shù)據(jù)是直接傳遞的,不會經(jīng)過SynchronousQueue。

SynchronousQueue是不會存儲數(shù)據(jù)的。

經(jīng)過阻塞隊列的學習:

生產(chǎn)者:

  • offer():生產(chǎn)者在放到SynchronousQueue的同時,如果有消費者在等待消息,直接配對。如果沒有消費者在等待消息,這里直接返回,告辭。
  • offer(time,unit):生產(chǎn)者在放到SynchronousQueue的同時,如果有消費者在等待消息,直接配對。如果沒有消費者在等待消息,阻塞time時間,如果還沒有,告辭。
  • put():生產(chǎn)者在放到SynchronousQueue的同時,如果有消費者在等待消息,直接配對。如果沒有,死等。

消費者:poll(),poll(time,unit),take()。道理和上面的生產(chǎn)者一致。

測試效果:

public static void main(String[] args) throws InterruptedException {
    // 因為當前隊列不存在數(shù)據(jù),沒有長度的概念。
    SynchronousQueue queue = new SynchronousQueue();

    String msg = "消息!";
    /*new Thread(() -> {
        // b = false:代表沒有消費者來拿
        boolean b = false;
        try {
            b = queue.offer(msg,1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(b);
    }).start();

    Thread.sleep(100);

    new Thread(() -> {
        System.out.println(queue.poll());
    }).start();*/
    new Thread(() -> {
        try {
            System.out.println(queue.poll(1, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    Thread.sleep(100);

    new Thread(() -> {
        queue.offer(msg);
    }).start();
}

6.2 SynchronousQueue核心屬性

進到SynchronousQueue類的內(nèi)部后,發(fā)現(xiàn)了一個內(nèi)部類,Transferer,內(nèi)部提供了一個transfer的方法

abstract static class Transferer<E> {
    abstract E transfer(E e, boolean timed, long nanos);
}

當前這個類中提供的transfer方法,就是生產(chǎn)者和消費者在調(diào)用讀寫數(shù)據(jù)時要用到的核心方法。

生產(chǎn)者在調(diào)用上述的transfer方法時,第一個參數(shù)e會正常傳遞數(shù)據(jù)

消費者在調(diào)用上述的transfer方法時,第一個參數(shù)e會傳遞null

SynchronousQueue針對抽象類Transferer做了幾種實現(xiàn)。

一共看到了兩種實現(xiàn)方式:

  • TransferStack
  • TransferQueue

這兩種類繼承了Transferer抽象類,在構(gòu)建SynchronousQueue時,會指定使用哪種子類

// 到底采用哪種實現(xiàn),需要把對應(yīng)的對象存放到這個屬性中
private transient volatile Transferer<E> transferer;
// 采用無參時,會調(diào)用下述方法,再次調(diào)用有參構(gòu)造傳入false
public SynchronousQueue() {
    this(false);
}
// 調(diào)用的是當前的有參構(gòu)造,fair代表公平還是不公平
public SynchronousQueue(boolean fair) {
    // 如果是公平,采用Queue,如果是不公平,采用Stack
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

TransferQueue的特點

多線程與高并發(fā)--------阻塞隊列,多線程與高并發(fā),java

多線程與高并發(fā)--------阻塞隊列,多線程與高并發(fā),java

代碼查看效果

public static void main(String[] args) throws InterruptedException {
    // 因為當前隊列不存在數(shù)據(jù),沒有長度的概念。
    SynchronousQueue queue = new SynchronousQueue(true);
    SynchronousQueue queue = new SynchronousQueue(false);

    new Thread(() -> {
        try {
            queue.put("生1");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    new Thread(() -> {
        try {
            queue.put("生2");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    new Thread(() -> {
        try {
            queue.put("生3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    Thread.sleep(100);
    new Thread(() -> {
        System.out.println("消1:" + queue.poll());
    }).start();
    Thread.sleep(100);
    new Thread(() -> {
        System.out.println("消2:" + queue.poll());
    }).start();
    Thread.sleep(100);
    new Thread(() -> {
        System.out.println("消3:" + queue.poll());
    }).start();
}

6.3 SynchronousQueue的TransferQueue源碼

為了查看清除SynchronousQueue的TransferQueue源碼,需要從兩點開始查看源碼信息

6.3.1 QNode源碼信息
static final class QNode {
    // 當前節(jié)點可以獲取到next節(jié)點
    volatile QNode next;  
    // item在不同情況下效果不同
    // 生產(chǎn)者:有數(shù)據(jù)
    // 消費者:為null
    volatile Object item;   
    // 當前線程
    volatile Thread waiter;   
    // 當前屬性是用來區(qū)分消費者和生產(chǎn)者的屬性
    final boolean isData;
    // 最終生產(chǎn)者需要將item交給消費者
    // 最終消費者需要獲取生產(chǎn)者的item
  
    // 省略了大量提供的CAS操作
    ....
}
6.3.2 transfer方法實現(xiàn)
// 當前方法是TransferQueue的核心內(nèi)容
// e:傳遞的數(shù)據(jù)
// timed:false,代表無限阻塞,true,代表阻塞nacos時間
E transfer(E e, boolean timed, long nanos) {
    // 當前QNode是要封裝當前生產(chǎn)者或者消費者的信息
    QNode s = null; 
    // isData == true:代表是生產(chǎn)者
    // isData == false:代表是消費者
    boolean isData = (e != null);
    // 死循環(huán)
    for (;;) {
        // 獲取尾節(jié)點和頭結(jié)點
        QNode t = tail;
        QNode h = head;
        // 為了避免TransferQueue還沒有初始化,這邊做一個健壯性判斷
        if (t == null || h == null)   
            continue;  

        // 如果滿足h == t 條件,說明當前隊列沒有生產(chǎn)者或者消費者,為空
        // 如果有節(jié)點,同時當前節(jié)點和隊列節(jié)點屬于同一種角色。
        // if中的邏輯是進到隊列
        if (h == t || t.isData == isData) { 
            // ===================在判斷并發(fā)問題==========================
            // 拿到尾節(jié)點的next
            QNode tn = t.next;
            // 如果t不為尾節(jié)點,進來說明有其他線程并發(fā)修改了tail
            if (t != tail)   
                // 重新走for循環(huán)   
                continue;
            // tn如果為不null,說明前面有線程并發(fā),添加了一個節(jié)點
            if (tn != null) {  
                // 直接幫助那個并發(fā)線程修改tail的指向   
                advanceTail(t, tn);
                // 重新走for循環(huán)   
                continue;
            }
            // 獲取當前線程是否可以阻塞
            // 如果timed為true,并且阻塞的時間小于等于0
            // 不需要匹配,直接告辭?。?!
            if (timed && nanos <= 0)   
                return null;
            // 如果可以阻塞,將當前需要插入到隊列的QNode構(gòu)建出來
            if (s == null)
                s = new QNode(e, isData);
            // 基于CAS操作,將tail節(jié)點的next設(shè)置為當前線程
            if (!t.casNext(null, s))   
                // 如果進到if,說明修改失敗,重新執(zhí)行for循環(huán)修改   
                continue;
            // CAS操作成功,直接替換tail的指向
            advanceTail(t, s);   
            // 如果進到隊列中了,掛起線程,要么等生產(chǎn)者,要么等消費者。
            // x是返回替換后的數(shù)據(jù)
            Object x = awaitFulfill(s, e, timed, nanos);
            // 如果元素和節(jié)點相等,說明節(jié)點取消了
            if (x == s) {  
                // 清空當前節(jié)點,將上一個節(jié)點的next指向當前節(jié)點的next,直接告辭   
                clean(t, s);
                return null;
            }
            // 判斷當前節(jié)點是否還在隊列中
            if (!s.isOffList()) {   
                // 將當前節(jié)點設(shè)置為head
                advanceHead(t, s);   
                // 如果 x != null, 如果拿到了數(shù)據(jù),說明我是消費者
                if (x != null)   
                    // 將當前節(jié)點的item設(shè)置為自己   
                    s.item = s;
                // 線程置位null
                s.waiter = null;
            }
            // 返回數(shù)據(jù)
            return (x != null) ? (E)x : e;
        } 
        // 匹配隊列中的橘色
        else {   
            // 拿到head的next,作為要匹配的節(jié)點   
            QNode m = h.next;  
            // 做并發(fā)判斷,如果頭節(jié)點,尾節(jié)點,或者head.next發(fā)生了變化,這邊要重新走for循環(huán)
            if (t != tail || m == null || h != head)
                continue;  
            // 沒并發(fā)問題,可以拿數(shù)據(jù)
            // 拿到m節(jié)點的item作為x。
            Object x = m.item;
            // 如果isData == (x != null)滿足,說明當前出現(xiàn)了并發(fā)問題,避免并發(fā)消費出現(xiàn)坑
            if (isData == (x != null) ||  
                // 如果排隊的節(jié)點取消,就會講當前QNode中的item指向QNode
                x == m ||   
                // 如果前面兩個都沒滿足,可以交換數(shù)據(jù)了。 
                // 如果交換失敗,說明有并發(fā)問題,
                !m.casItem(x, e)) {   
                // 重新設(shè)置head節(jié)點,并且再走一次循環(huán)  
                advanceHead(h, m);  
                continue;
            }
            // 替換head
            advanceHead(h, m);  
            // 喚醒head.next中的線程
            LockSupport.unpark(m.waiter);
            // 這邊匹配好了,數(shù)據(jù)也交換了,直接返回
            // 如果 x != null,說明隊列中是生產(chǎn)者,當前是消費者,這邊直接返回x具體數(shù)據(jù)
            // 反之,隊列中是消費者,當前是生產(chǎn)者,直接返回自己的數(shù)據(jù)
            return (x != null) ? (E)x : e;
        }
    }
}
6.3.3 tansfer方法流程圖

多線程與高并發(fā)--------阻塞隊列,多線程與高并發(fā),java文章來源地址http://www.zghlxwxcb.cn/news/detail-642523.html

到了這里,關(guān)于多線程與高并發(fā)--------阻塞隊列的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 【Java|多線程與高并發(fā)】定時器(Timer)詳解

    【Java|多線程與高并發(fā)】定時器(Timer)詳解

    在Java中,定時器 Timer 類是用于執(zhí)行定時任務(wù)的工具類。它允許你安排一個任務(wù)在未來的某個時間點執(zhí)行,或者以固定的時間間隔重復執(zhí)行。 在服務(wù)器開發(fā)中,客戶端向服務(wù)器發(fā)送請求,然后等待服務(wù)器響應(yīng). 但服務(wù)器什么時候返回響應(yīng),并不確定. 但也不能讓客戶端一直等下去

    2024年02月07日
    瀏覽(20)
  • 【Java|多線程與高并發(fā)】wait和notify方法詳解

    【Java|多線程與高并發(fā)】wait和notify方法詳解

    在Java多線程環(huán)境中,線程之間是搶占式執(zhí)行的,線程的調(diào)度是隨機的.這就很難受了. 在很多情況下我們希望線程以我們想要的順序來執(zhí)行. 這就需要 wait 和 notify 這兩個方法 首先是 wait 方法 wait是 Object 類的方法,而Java中的類都是間接或直接繼承于Object類. 因此只要是類的實例都可

    2024年02月10日
    瀏覽(21)
  • 【Java|多線程與高并發(fā)】JUC中常用的類和接口

    【Java|多線程與高并發(fā)】JUC中常用的類和接口

    JUC是Java并發(fā)編程中的一個重要模塊,全稱為 Java Util Concurrent (Java并發(fā)工具包),它提供了一組用于多線程編程的工具類和框架,幫助開發(fā)者更方便地編寫線程安全的并發(fā)代碼。 本文主要介紹 Java Util Concurrent 下的一些常用接口和類 Callable接口類似于Runnable. 有一點區(qū)別就是

    2024年02月12日
    瀏覽(20)
  • 【Java|多線程與高并發(fā)】設(shè)計模式-單例模式(餓漢式,懶漢式和靜態(tài)內(nèi)部類)

    【Java|多線程與高并發(fā)】設(shè)計模式-單例模式(餓漢式,懶漢式和靜態(tài)內(nèi)部類)

    設(shè)計模式是一種在軟件開發(fā)中常用的解決復雜問題的方法論。它提供了一套經(jīng)過驗證的解決方案,用于解決特定類型問題的設(shè)計和實現(xiàn)。設(shè)計模式可以幫助開發(fā)人員提高代碼的可重用性、可維護性和可擴展性。 設(shè)計模式有很多,本文主要介紹單例模式. 單例模式是一種創(chuàng)建型設(shè)

    2024年02月11日
    瀏覽(28)
  • java高并發(fā)系列 - 第25天:掌握JUC中的阻塞隊列

    這是java高并發(fā)系列第25篇文章。 環(huán)境:jdk1.8。 本文內(nèi)容 掌握Queue、BlockingQueue接口中常用的方法 介紹6中阻塞隊列,及相關(guān)場景示例 重點掌握4種常用的阻塞隊列 Queue接口 隊列是一種先進先出(FIFO)的數(shù)據(jù)結(jié)構(gòu),java中用Queue接口來表示隊列。 Queue接口中定義了6個方法:

    2024年02月14日
    瀏覽(59)
  • 深入淺出Java多線程(十三):阻塞隊列

    大家好,我是你們的老伙計秀才!今天帶來的是[深入淺出Java多線程]系列的第十三篇內(nèi)容:阻塞隊列。大家覺得有用請點贊,喜歡請關(guān)注!秀才在此謝過大家了?。?! 在多線程編程的世界里,生產(chǎn)者-消費者問題是一個經(jīng)典且頻繁出現(xiàn)的場景。設(shè)想這樣一個情況:有一群持續(xù)

    2024年03月20日
    瀏覽(28)
  • 【Java】多線程案例(單例模式,阻塞隊列,定時器,線程池)

    【Java】多線程案例(單例模式,阻塞隊列,定時器,線程池)

    ?? Author: 老九 ?? 個人博客:老九的CSDN博客 ?? 個人名言:不可控之事 樂觀面對 ?? 系列專欄: 單例模式是設(shè)計模式之一。代碼當中的某個類,只能有一個實例,不能有多個。單例模式分為:餓漢模式和懶漢模式 餓漢模式表示很著急,就想吃完飯剩下很多碗,然后一

    2024年02月06日
    瀏覽(51)
  • 多線程與高并發(fā)--------線程池

    多線程與高并發(fā)--------線程池

    在開發(fā)中,為了提升效率的操作,我們需要將一些業(yè)務(wù)采用多線程的方式去執(zhí)行。 比如有一個比較大的任務(wù),可以將任務(wù)分成幾塊,分別交給幾個線程去執(zhí)行,最終做一個匯總就可以了。 比如做業(yè)務(wù)操作時,需要發(fā)送短信或者是發(fā)送郵件,這種操作也可以基于異步的方式完

    2024年02月13日
    瀏覽(26)
  • 多線程與高并發(fā)——并發(fā)編程(4)

    1.1 生產(chǎn)者消費者概念 生產(chǎn)者-消費者是設(shè)計模式的一種,讓生產(chǎn)者和消費者基于一個容器來解決強耦合的問題。生產(chǎn)者與消費者彼此之間不會直接通訊,而是通過一個容器(隊列)進行通訊。 生產(chǎn)者生產(chǎn)完數(shù)據(jù)后扔到容器中,不用等消費者來處理; 消費者也不需要去找生產(chǎn)

    2024年02月10日
    瀏覽(19)
  • 多線程與高并發(fā)——并發(fā)編程(5)

    多線程與高并發(fā)——并發(fā)編程(5)

    為什么要使用線程池? 在開發(fā)中,為了提升效率,我們需要將一些業(yè)務(wù)采用多線程的方式去執(zhí)行。比如,有一個比較大的任務(wù),可以將任務(wù)分成幾塊,分別交給幾個線程去執(zhí)行,最終做一個匯總即可。再比如,做業(yè)務(wù)操作時,需要發(fā)送短信或郵件,這些操作也可以基于異步的

    2024年02月09日
    瀏覽(37)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包