国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

多線(xiàn)程與高并發(fā)——并發(fā)編程(4)

這篇具有很好參考價(jià)值的文章主要介紹了多線(xiàn)程與高并發(fā)——并發(fā)編程(4)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

四、阻塞隊(duì)列

1 基礎(chǔ)概念

1.1 生產(chǎn)者消費(fèi)者概念

生產(chǎn)者-消費(fèi)者是設(shè)計(jì)模式的一種,讓生產(chǎn)者和消費(fèi)者基于一個(gè)容器來(lái)解決強(qiáng)耦合的問(wèn)題。生產(chǎn)者與消費(fèi)者彼此之間不會(huì)直接通訊,而是通過(guò)一個(gè)容器(隊(duì)列)進(jìn)行通訊。

  • 生產(chǎn)者生產(chǎn)完數(shù)據(jù)后扔到容器中,不用等消費(fèi)者來(lái)處理;
  • 消費(fèi)者也不需要去找生產(chǎn)者要數(shù)據(jù),直接從容器中獲取即可;
  • 而這種容器最常用的結(jié)構(gòu)就是隊(duì)列。

1.2 JUC阻塞隊(duì)列的存取方法

常用的存取方法都來(lái)自 JUC 包下的 BlockingQueue

  • 生產(chǎn)者存儲(chǔ)方法:
    • add(E):添加數(shù)據(jù)到隊(duì)列,若隊(duì)列滿(mǎn)了,拋出異常;
    • offer(E):添加數(shù)據(jù)到隊(duì)列,若隊(duì)列滿(mǎn)了,返回 false;
    • offer(E,timeout,unit):添加數(shù)據(jù)到隊(duì)列,若隊(duì)列滿(mǎn)了,阻塞 timeout 時(shí)間,超時(shí)后返回 false;
    • put(E):添加數(shù)據(jù)到隊(duì)列,若隊(duì)列滿(mǎn)了,掛起線(xiàn)程,等到隊(duì)列中有位置,再扔數(shù)據(jù)進(jìn)去,死等。
  • 消費(fèi)者取數(shù)據(jù)方法:
    • remove():從隊(duì)列中移除數(shù)據(jù),若隊(duì)列為空,拋出異常;
    • poll():從隊(duì)列中移除數(shù)據(jù),若隊(duì)列為空,返回 false;
    • poll(timeout,unit):從隊(duì)列中移除數(shù)據(jù),若隊(duì)列為空,阻塞 timeout 時(shí)間,等生產(chǎn)者仍數(shù)據(jù)再獲取數(shù)據(jù),超時(shí)后返回 false;
    • take():從隊(duì)列中移除數(shù)據(jù),若隊(duì)列為空,掛起線(xiàn)程,一直等生產(chǎn)者仍數(shù)據(jù)再獲取。

2 ArrayBlockingQueue

2.1 ArrayBlockingQueue的基本使用

  • ArrayBlockingQueue 在初始化時(shí),必須指定當(dāng)前隊(duì)列的長(zhǎng)度,因?yàn)?ArrayBlockingQueue 是基于數(shù)組實(shí)現(xiàn)的隊(duì)列結(jié)構(gòu),數(shù)組長(zhǎng)度不可變,必須提前設(shè)置數(shù)據(jù)長(zhǎng)度信息。
public static void main(String[] args) throws InterruptedException {
   
    // 必須設(shè)置隊(duì)列長(zhǎng)度
    ArrayBlockingQueue queue = new ArrayBlockingQueue(4);
    // 生產(chǎn)者生產(chǎn)數(shù)據(jù)
    queue.add("1");
    queue.offer("2");
    queue.offer("3", 2, TimeUnit.SECONDS);
    queue.put("4");
    // 消費(fèi)者消費(fèi)數(shù)據(jù)
    System.out.println(queue.remove());
    System.out.println(queue.poll());
    System.out.println(queue.poll(2, TimeUnit.SECONDS));
    System.out.println(queue.take());
}

2.2 生產(chǎn)者方法實(shí)現(xiàn)原理

  • 生產(chǎn)者添加數(shù)據(jù)到隊(duì)列的方法比較多,需要一個(gè)一個(gè)看
2.2.1 ArrayBlockingQueue的常見(jiàn)屬性

ArrayBlockingQueue中的成員變量

final Object[] items; 				// 就是數(shù)組本身
int takeIndex;						// 取數(shù)據(jù)的下標(biāo)
int putIndex;						// 存數(shù)據(jù)的下標(biāo)
int count;							// 當(dāng)前數(shù)組中元素的個(gè)數(shù)
final ReentrantLock lock;			// 就是一個(gè) ReentrantLock 鎖
private final Condition notEmpty;	// 消費(fèi)者掛起線(xiàn)程和喚醒線(xiàn)程用到的Condition(可看作是synchronized的wait和notify)
private final Condition notFull;	// 生產(chǎn)者掛起線(xiàn)程和喚醒線(xiàn)程用到的Condition(可看作是synchronized的wait和notify)
2.2.2 add方法
  • add方法本身就是調(diào)用了offer方法,如果offer方法返回false,直接拋出異常
public boolean add(E e) {
   
    if (offer(e))
        return true;
    else 	// 拋出的異常
        throw new IllegalStateException("Queue full");
}
2.2.3 offer方法
public boolean offer(E e) {
   
    checkNotNull(e);	// 要求存儲(chǔ)的數(shù)據(jù)不允許為null,否則拋出空指針異常
	// 拿到當(dāng)前阻塞隊(duì)列的lock鎖
    final ReentrantLock lock = this.lock;
    lock.lock();	// 為保證線(xiàn)程安全,加鎖
    try {
   
		// 判斷隊(duì)列中元素是否滿(mǎn)了,若滿(mǎn)了,則返回false
        if (count == items.length)
            return false;
        else {
   
			// 隊(duì)列沒(méi)滿(mǎn),執(zhí)行 enqueue 將元素添加到隊(duì)列中,并返回true
            enqueue(e);
            return true;
        }
    } finally {
   
        lock.unlock();		// 操作完釋放鎖
    }
}
// ================
private void enqueue(E x) {
   
    // 拿到數(shù)組的引用,將元素放到指定的位置
    final Object[] items = this.items;
    items[putIndex] = x;
	// 對(duì)putIndex進(jìn)行++操作,并判斷是否等于數(shù)組長(zhǎng)度,需要?dú)w為
    if (++putIndex == items.length)
        putIndex = 0;	// 歸位:將索引值設(shè)置為0
    count++;	// 添加成功,數(shù)據(jù)++
    notEmpty.signal();	// 將一個(gè)Condition中阻塞的線(xiàn)程喚醒
}
2.2.4 offer(time,unit)方法

生產(chǎn)者在添加數(shù)據(jù)時(shí),如果隊(duì)列已經(jīng)滿(mǎn),阻塞一會(huì):

  • 阻塞到消費(fèi)者消費(fèi)了消息,然后喚醒當(dāng)前阻塞線(xiàn)程;
  • 阻塞到了 timeout 時(shí)間,再次判斷是否可以添加,若不能直接告辭。
// 線(xiàn)程在掛起時(shí),如果對(duì)當(dāng)前阻塞線(xiàn)程的終端標(biāo)記位進(jìn)行設(shè)置,會(huì)拋出異常直接結(jié)束
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
   
	// 非空校驗(yàn)
    checkNotNull(e);
    long nanos = unit.toNanos(timeout);		// 將時(shí)間單位轉(zhuǎn)為納秒
    final ReentrantLock lock = this.lock;	// 加鎖
    lock.lockInterruptibly();	// 允許線(xiàn)程中斷排除異常的加鎖方法
    try {
   
        // 為什么是while(虛假喚醒)
        while (count == items.length) {
   	// 如果元素個(gè)數(shù)和數(shù)組長(zhǎng)度一致,說(shuō)明隊(duì)列滿(mǎn)了
            if (nanos <= 0)	// 判斷等待時(shí)間是否充裕
                return false;	// 不充裕,直接添加失敗,返回false
            // 掛起等待,會(huì)同時(shí)釋放鎖資源(對(duì)標(biāo) synchronized 的wait方法)
            // awaitNanos會(huì)掛起線(xiàn)程,并且返回剩余的阻塞時(shí)間,恢復(fù)執(zhí)行時(shí),需要重新獲取鎖資源
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e); // 這里鎖門(mén)隊(duì)列有空間了,enqueue將數(shù)據(jù)添加到阻塞隊(duì)列中,并返回true
        return true;
    } finally {
   
        lock.unlock();	// 是否鎖資源
    }
}
2.2.5 put方法
  • 如果隊(duì)列是滿(mǎn)的,就一直掛起,直到被喚醒,或者被中斷
public void put(E e) throws InterruptedException {
   
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
   
        while (count == items.length)
            // await方法會(huì)一直阻塞,直到被喚醒或者被中斷
            notFull.await();
        enqueue(e);
    } finally {
   
        lock.unlock();
    }
}

2.3 消費(fèi)者方法實(shí)現(xiàn)原理

2.3.1 remove方法
  • remove方法本身就是調(diào)用了poll方法,如果poll方法返回null,直接拋出異常
public E remove() {
   
    E x = poll();
    if (x != null)
        return x;
    else	// 沒(méi)數(shù)據(jù)拋出異常
        throw new NoSuchElementException();
}
2.3.2 poll方法
// 拉取數(shù)據(jù)
public E poll() {
   
    final ReentrantLock lock = this.lock;
    lock.lock();	// 加鎖
    try {
   
        // 若沒(méi)有數(shù)據(jù),直接返回null;否則執(zhí)行dequeue,取出數(shù)據(jù)并返回
        return (count == 0) ? null : dequeue();
    } finally {
   
        lock.unlock();
    }
}
// 取出數(shù)據(jù)
private E dequeue() {
   
    // 將成員變量引用到局部變量
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];		// 直接獲取指定索引位置的數(shù)據(jù)
    items[takeIndex] = null;		// 取出數(shù)據(jù)后,清空該索引位置
    if (++takeIndex == items.length)	// 設(shè)置下次取數(shù)據(jù)的索引位置
        takeIndex = 0;
    count--;	// 數(shù)組中元素個(gè)數(shù)減一
    if (itrs != null)	// 迭代器內(nèi)容先跳過(guò)
        itrs.elementDequeued();
    // signal方法,會(huì)喚醒當(dāng)前Condition中排隊(duì)的一個(gè)Node
    // signalAll方法,會(huì)將Condition中所有的Node,全都喚醒
    notFull.signal();
    return x;	// 返回?cái)?shù)據(jù)
}
2.3.3 poll(timeout,unit)方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
   
    long nanos = unit.toNanos(timeout);		// 轉(zhuǎn)換時(shí)間單位
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();				// 加鎖,可中斷喚醒
    try {
   
        while (count == 0) {
   	// 如果沒(méi)數(shù)據(jù)
            if (nanos <= 0)		// 也沒(méi)時(shí)間了,就不阻塞,返回null
                return null;
            // 有時(shí)間,就掛起消費(fèi)者線(xiàn)程一段時(shí)間
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();	// 取數(shù)據(jù)
    } finally {
   
        lock.unlock();
    }
}
2.3.4 take方法
public E take() throws InterruptedException {
   
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
   
        while (count == 0)	// 使用while,防止虛假喚醒
            notEmpty.await();
        return dequeue();
    } finally {
   
        lock.unlock();
    }
}
2.3.5 虛假喚醒

阻塞隊(duì)列中,如果需要線(xiàn)程掛起操作,判斷有無(wú)數(shù)據(jù)的位置采用的是while循環(huán),為什么不使用if?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-694621.html

  • 首先肯定不能換成 if 邏輯判斷,比如:有線(xiàn)程 A、B、E、C,其中 ABE 是生產(chǎn)者,C是消費(fèi)者。假如線(xiàn)程的隊(duì)列是滿(mǎn)的,AB掛起
// E,拿到鎖資源,還沒(méi)有走while判斷
while (count == items.length)
    // A醒了
    // B掛起
    notFull.await();
enqueue(e);
  • C 此時(shí)消費(fèi)一條數(shù)據(jù),執(zhí)行 notFull.signal() 喚醒一個(gè)線(xiàn)程,A線(xiàn)程被喚醒;E走判斷發(fā)現(xiàn)有空余位置,可以添加數(shù)據(jù)到隊(duì)列,則E添加數(shù)據(jù),走enqueue。
  • 如果判斷是 if,A 在E釋放鎖資源后,拿到鎖資源,直接走 enqueue 方法,此時(shí) A線(xiàn)程就是在 putIndex 的位置,覆蓋掉之前的數(shù)據(jù),會(huì)造成數(shù)據(jù)安全問(wèn)題。

3 LinkedBlockingQueue

3.1 LinkedBlockingQueue的底層實(shí)現(xiàn)

  • 查看 LinkedBlockingQueue 是如何存儲(chǔ)數(shù)據(jù),以及如何實(shí)現(xiàn)鏈表結(jié)構(gòu)的。
// Node對(duì)象就是存儲(chǔ)數(shù)據(jù)的單位
static class Node<E> {
   
    // 存儲(chǔ)的數(shù)據(jù)
    E item;
	// 指向下一個(gè)數(shù)據(jù)的指針
    Node<E> next;
	// 有參構(gòu)造
    Node(E x) {
    item = x; }
}
  • 查看LinkedBlockingQueue的有參構(gòu)造
// 可以手動(dòng)指定LinkedBlockingQueue的長(zhǎng)度,如果沒(méi)有指定,默認(rèn)為Integer.MAX_VALUE
public LinkedBlockingQueue(int capacity) {
   
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 在初始化時(shí),構(gòu)建一個(gè)item為null的節(jié)點(diǎn),作為head和last,這種node可以成為哨兵Node,
    // 如果沒(méi)有哨兵節(jié)點(diǎn),那么在獲取數(shù)據(jù)時(shí),需要判斷head是否為null,才能找next
    // 如果沒(méi)有哨兵節(jié)點(diǎn),那么在添加數(shù)據(jù)時(shí),需要判斷l(xiāng)ast是否為null,才能找next
    last = head = new Node<E>(null);
}
  • 查看LinkedBlockingQueue的其他屬性
// 因?yàn)槭擎湵?,沒(méi)有想數(shù)組的length屬性,基于AtomicInteger來(lái)記錄長(zhǎng)度
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;	// 鏈表的頭

到了這里,關(guān)于多線(xiàn)程與高并發(fā)——并發(fā)編程(4)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Java高可用監(jiān)控中間件

    Prometheus是一個(gè)開(kāi)源的系統(tǒng)監(jiān)控和警報(bào)工具集,用于收集、存儲(chǔ)和查詢(xún)時(shí)間序列數(shù)據(jù)。 它支持多種數(shù)據(jù)源,可以監(jiān)控分布式系統(tǒng)的各種指標(biāo),并提供強(qiáng)大的查詢(xún)語(yǔ)言和靈活的警報(bào)規(guī)則。 Grafana是一個(gè)開(kāi)源的可視化監(jiān)控和分析平臺(tái),可以與多個(gè)數(shù)據(jù)源集成,包括Prometheus、InfluxD

    2024年01月23日
    瀏覽(25)
  • 【Java面試丨消息中間件】Kafka

    【Java面試丨消息中間件】Kafka

    1. 介紹 使用kafka在消息的收發(fā)過(guò)程都有可能會(huì)出現(xiàn)消息丟失 (1)生產(chǎn)者發(fā)送消息到broker丟失 (2)消息在broker中存儲(chǔ)丟失 (3)消費(fèi)者從broker接收消息丟失 2. 生產(chǎn)者發(fā)送消息到broker丟失 設(shè)置異步發(fā)送:同步發(fā)送會(huì)發(fā)生阻塞,一般使用異步發(fā)送方式發(fā)送消息 消息重試:由于網(wǎng)

    2024年02月11日
    瀏覽(30)
  • 深入理解Java消息中間件-組件-消息隊(duì)列

    引言: 消息中間件在現(xiàn)代分布式系統(tǒng)中扮演著至關(guān)重要的角色,它解決了系統(tǒng)之間異步通信和解耦的需求。而在消息中間件的架構(gòu)中,核心組件之一就是消息隊(duì)列。本文將深入探討消息隊(duì)列的架構(gòu)組件,幫助讀者加深對(duì)消息中間件的理解和應(yīng)用。 一、什么是消息隊(duì)列 消息隊(duì)列

    2024年04月27日
    瀏覽(94)
  • java后端技術(shù)匯總 + 中間件 + 架構(gòu)思想

    1. 華為OD機(jī)考題 + 答案 2023華為OD統(tǒng)一考試(A+B卷)題庫(kù)清單-帶答案(持續(xù)更新) 2023年華為OD真題機(jī)考題庫(kù)大全-帶答案(持續(xù)更新) 2. 面試題 一手真實(shí)java面試題:2023年各大公司java面試真題匯總--持續(xù)更新 3. 技術(shù)知識(shí) java后端技術(shù)匯總 + 中間件 + 架構(gòu)思想 類(lèi)型 難度 Spring、

    2024年02月13日
    瀏覽(27)
  • Java開(kāi)發(fā)框架和中間件面試題(8)

    Java開(kāi)發(fā)框架和中間件面試題(8)

    目錄 82.Mybatis一級(jí)緩存,二級(jí)緩存? 83.Mybatis如何防止SQL注入? 84.mybatis中resultType和resultMap有什么區(qū)別? 85.如何在SpringBoot中禁用Actuator斷點(diǎn)安全性? 86.什么是SpringBoot?SpringBoot有哪些優(yōu)點(diǎn)? 87.SpringBoot中的監(jiān)視器是什么? 88.什么是yaml文件? 89.如何使用SpringBoot實(shí)現(xiàn)異常處理?

    2024年02月03日
    瀏覽(15)
  • 【Java|多線(xiàn)程與高并發(fā)】線(xiàn)程安全問(wèn)題以及synchronized使用實(shí)例

    【Java|多線(xiàn)程與高并發(fā)】線(xiàn)程安全問(wèn)題以及synchronized使用實(shí)例

    Java多線(xiàn)程環(huán)境下,多個(gè)線(xiàn)程同時(shí)訪問(wèn)共享資源時(shí)可能出現(xiàn)的數(shù)據(jù)競(jìng)爭(zhēng)和不一致的情況。 線(xiàn)程安全一直都是一個(gè)令人頭疼的問(wèn)題.為了解決這個(gè)問(wèn)題,Java為我們提供了很多方式. synchronized、ReentrantLock類(lèi)等。 使用線(xiàn)程安全的數(shù)據(jù)結(jié)構(gòu),例如ConcurrentHashMap、ConcurrentLinkedQueue等

    2024年02月09日
    瀏覽(14)
  • 【Java|多線(xiàn)程與高并發(fā)】線(xiàn)程的中斷的兩種方法

    【Java|多線(xiàn)程與高并發(fā)】線(xiàn)程的中斷的兩種方法

    線(xiàn)程中斷是指在一個(gè)線(xiàn)程執(zhí)行的過(guò)程中,強(qiáng)制終止該線(xiàn)程的執(zhí)行。雖說(shuō)是中斷,但本質(zhì)上是讓run方法快點(diǎn)執(zhí)行完,而不是run方法執(zhí)行到一半,強(qiáng)制結(jié)束. 本文主要介紹線(xiàn)程中斷的兩種方法 看下面這段代碼: 運(yùn)行結(jié)果: 看下面這張圖: 在這段代碼中,定義了一個(gè) flag 的標(biāo)志位,在 線(xiàn)程

    2024年02月08日
    瀏覽(28)
  • linux傻瓜式安裝Java環(huán)境及中間件

    linux傻瓜式安裝Java環(huán)境及中間件

    1.下載 2.追加 使用vim /etc/profile 編輯profile文件 輸入 3.刷新測(cè)試 1.docker卸載 2.docker安裝 注:-p 將宿主機(jī)端口與容器端口映射,冒號(hào)左側(cè)是宿主機(jī)端口,右側(cè)是容器端 1.傻瓜式安裝安裝并配置 注:requirepass 123456 redis密碼 1.安裝 2.啟動(dòng)并配置密碼 3.配置web頁(yè)面插件 1.拉取 2.配置

    2024年02月05日
    瀏覽(19)
  • 【Java|多線(xiàn)程與高并發(fā)】定時(shí)器(Timer)詳解

    【Java|多線(xiàn)程與高并發(fā)】定時(shí)器(Timer)詳解

    在Java中,定時(shí)器 Timer 類(lèi)是用于執(zhí)行定時(shí)任務(wù)的工具類(lèi)。它允許你安排一個(gè)任務(wù)在未來(lái)的某個(gè)時(shí)間點(diǎn)執(zhí)行,或者以固定的時(shí)間間隔重復(fù)執(zhí)行。 在服務(wù)器開(kāi)發(fā)中,客戶(hù)端向服務(wù)器發(fā)送請(qǐng)求,然后等待服務(wù)器響應(yīng). 但服務(wù)器什么時(shí)候返回響應(yīng),并不確定. 但也不能讓客戶(hù)端一直等下去

    2024年02月07日
    瀏覽(20)
  • Java中支持分庫(kù)分表的框架/組件/中間件簡(jiǎn)介

    Java中支持分庫(kù)分表的框架/組件/中間件簡(jiǎn)介

    列舉一些比較常見(jiàn)的,簡(jiǎn)單介紹一下: sharding-jdbc(當(dāng)當(dāng)) TSharding(蘑菇街) Atlas(奇虎360) Cobar(阿里巴巴) MyCAT(基于Cobar) TDDL(淘寶) Vitess(谷歌) 首先,第一個(gè),可能也是最常見(jiàn)最常用的,Sharding-JDBC,這個(gè)是最早的名字,現(xiàn)在已經(jīng)發(fā)展成為ShardingSphere,生態(tài),詳細(xì)

    2024年02月10日
    瀏覽(27)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包