1、阻塞隊(duì)列
隊(duì)列是一種先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu)。而阻塞隊(duì)列也是一種特殊的隊(duì)列,也遵守”先進(jìn)先出“的原則。
阻塞隊(duì)列是一種線程安全的的數(shù)據(jù)結(jié)構(gòu),并且具有以下特性:
1、隊(duì)列往進(jìn)寫(xiě)元素是從隊(duì)尾插入,隊(duì)首取出
2、當(dāng)插入元素的時(shí)候,先判斷一下,隊(duì)列是否已經(jīng)滿了,如果滿了就繼續(xù)等(阻塞),等到隊(duì)列有空余的位置的時(shí)候再去插入。
3、當(dāng)取出元素的時(shí)候,先判斷一下,隊(duì)列是否為空,如果空了就繼續(xù)等(阻塞),等到隊(duì)列中有元素的時(shí)候再去取出。
阻塞隊(duì)列有一個(gè)典型的應(yīng)用場(chǎng)景就是”生產(chǎn)者消費(fèi)者模型“,這是一種非常典型的開(kāi)發(fā)模型。
生產(chǎn)者消費(fèi)者模型:
生產(chǎn)者和消費(fèi)者模式就是通過(guò)一個(gè)容器來(lái)解決生產(chǎn)者和消費(fèi)者強(qiáng)耦合問(wèn)題。
生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而是通過(guò)阻塞隊(duì)列來(lái)進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用原地等待消費(fèi)者進(jìn)行處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列中獲取。
1、阻塞隊(duì)列相當(dāng)于是一個(gè)緩沖區(qū),平衡了消費(fèi)者和生產(chǎn)者之間的處理能力
比如在一些購(gòu)物軟件有"秒殺"場(chǎng)景,服務(wù)器在同一時(shí)刻可能會(huì)收到大量的支付請(qǐng)求,如果直接處理這些請(qǐng)求,服務(wù)器很有可能會(huì)扛不住大量數(shù)據(jù)的沖擊(每一個(gè)支付請(qǐng)求的處理都需要比較復(fù)雜的流程),這個(gè)時(shí)候就可以把它放到一個(gè)阻塞隊(duì)列中,然后由服務(wù)器慢慢處理每個(gè)支付請(qǐng)求。
這樣做可以有效的進(jìn)行“削峰",防止服務(wù)器被突然到來(lái)的一波請(qǐng)求直接沖垮。
2、阻塞隊(duì)列也能使生產(chǎn)者和消費(fèi)者之間解耦
比如過(guò)年一家人在一起包餃子,一般都是分工明確,比如一個(gè)人負(fù)責(zé)搟餃子皮,其他人負(fù)責(zé)包餃子,那么搟餃子皮的人就是生產(chǎn)者,包餃子的人就是消費(fèi)者。
搟餃子皮的不關(guān)心包餃子的人是誰(shuí),只管搟包子皮,包餃子的人也不管搟包子皮的人是誰(shuí),只管包餃子。
?
?1.1、消息隊(duì)列
消息隊(duì)列是阻塞隊(duì)列一種典型應(yīng)用,基于消費(fèi)者生產(chǎn)者模型實(shí)現(xiàn)的,是在業(yè)務(wù)的驅(qū)使下,應(yīng)用隊(duì)列這個(gè)數(shù)據(jù)結(jié)構(gòu)做了一些自定義的功能開(kāi)發(fā),滿組一些真實(shí)業(yè)務(wù)工作,類似這樣的框架或是軟件,被叫做“中間件”。
工作原理:
1、正常隊(duì)列,先進(jìn)先出,完全遵守這個(gè)順序。
2、消息隊(duì)列,把每個(gè)消息打了個(gè)”標(biāo)簽“。標(biāo)簽可以理解為”類型",把消息類型分類
1.2、為什么要使用消息隊(duì)列(阻塞隊(duì)列)?
1、解耦
現(xiàn)在的程序盡量做到高內(nèi)聚,低耦合。 也就是業(yè)務(wù)強(qiáng)相關(guān)的代碼放到一起,為了維護(hù)程序方便,設(shè)計(jì)和組織代碼的一種方式。需要哪一種方法去接口調(diào)用即可,避免代碼分散開(kāi)發(fā)。
?2、削峰填谷
微博很難應(yīng)對(duì)流量暴增的情況,流量暴增會(huì)在系統(tǒng)中申請(qǐng)很多很多線程,各種資源,最終會(huì)瞬間把服務(wù)器資源耗盡。
?
?陶寶應(yīng)對(duì)流量沖擊案例:
?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-502089.html
削峰:在流量暴增的時(shí)候用消息隊(duì)列把消息緩存起來(lái),后面的服務(wù)器一點(diǎn)一點(diǎn)正常處理。
填谷:消費(fèi)信息的服務(wù)器在流量不多的情況下,處理之前堆積的消息,就是填谷
3、異步操作
在發(fā)起請(qǐng)求到接收到響應(yīng)的過(guò)程中,啥也不干,叫做同步;如果發(fā)起請(qǐng)求之后去執(zhí)行別任務(wù),那么就叫做異步。
1.3、標(biāo)準(zhǔn)庫(kù)中的阻塞隊(duì)列?
在java標(biāo)準(zhǔn)庫(kù)中內(nèi)置了阻塞隊(duì)列,如果需要在一些程序中使用阻塞隊(duì)列,直接使用標(biāo)準(zhǔn)庫(kù)中的即可。
*BlockingQueue是一個(gè)接口,真正實(shí)現(xiàn)類的是LinkedBlockingQueue。
*put方法用于阻塞式的入隊(duì)列,take用于阻塞式的出隊(duì)列。
*BlockingQueue也有offer,poll,peek等方法,但是這些方法不帶有阻塞特性。?
?代碼示例:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class Exe_01 {
public static void main(String[] args) throws InterruptedException {
//JDK提供的創(chuàng)建方式
BlockingQueue<Integer> queue=new LinkedBlockingQueue<>(3);
//往阻塞隊(duì)列添加3個(gè)元素
queue.put(1);
queue.put(2);
queue.put(3);
System.out.println("添加了3個(gè)元素");
//再添加第四個(gè)
//queue.put(4);
//System.out.println("添加了4個(gè)元素");
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println("take了三個(gè)元素");
}
}
?1.4、阻塞隊(duì)列的實(shí)現(xiàn)
*通過(guò)“循環(huán)隊(duì)列”的方式來(lái)實(shí)現(xiàn)
*使用synchronized來(lái)進(jìn)行加鎖控制
*put插入元素的時(shí)候,判定如果隊(duì)列滿的話,就進(jìn)行wait。(注意:要在循環(huán)的時(shí)候進(jìn)行wait,被喚醒時(shí)不一定隊(duì)列就不滿了,因?yàn)橥瑫r(shí)可能喚醒了多個(gè)線程)
*take取出元素的時(shí)候,判定如果隊(duì)列為空,就進(jìn)行wait。(也是循環(huán)wait)
阻塞隊(duì)列實(shí)現(xiàn)分析:
1、在普通隊(duì)列的基礎(chǔ)上加了等待操作,在入隊(duì)時(shí)如果隊(duì)列已滿就要等,出隊(duì)時(shí)隊(duì)列為空就要等
2、在普通隊(duì)列的基礎(chǔ)上加上了喚醒操作,執(zhí)行完入隊(duì)操作就會(huì)喚醒出隊(duì)線程,執(zhí)行完出隊(duì)操作就會(huì)喚醒入隊(duì)線程
阻塞隊(duì)列不可能出現(xiàn)即是空的,又是滿的這種狀態(tài),所以不會(huì)出現(xiàn)相互等待的現(xiàn)象。
?
代碼示例:
/**
* 實(shí)現(xiàn)阻塞隊(duì)列
*/
public class MyBlockingQueue {
//需要一個(gè)數(shù)組來(lái)保存數(shù)據(jù)
private Integer[] elementData=new Integer[20];
//定義隊(duì)首隊(duì)尾的下標(biāo)
private volatile int head=0;
private volatile int tail=0;
//有效元素的個(gè)數(shù)
private volatile int size=0;
/**
* 添加元素
* @param value
*/
public void put(Integer value) throws InterruptedException {
synchronized(this){
//判斷隊(duì)列是否已經(jīng)滿了
if(size>=elementData.length){
this.wait();
}
//從隊(duì)尾入隊(duì)
elementData[tail]=value;
//隊(duì)尾向前移動(dòng)
tail++;
//處理循環(huán)
if(tail>= elementData.length){
tail=0;
}
size++;
//添加新的元素喚醒線程
this.notifyAll();
}
}
/**
* 獲取元素
* @return
*/
public Integer take() throws InterruptedException {
synchronized(this){
//先判斷是否為空
if(size==0){
//出隊(duì)時(shí),如果為空,繼續(xù)等待
this.wait();
}
//出隊(duì)隊(duì)首元素
Integer value=elementData[head];
//向后移動(dòng)head
head++;
//處理循環(huán)
if(head>=elementData.length){
head=0;
}
size--;
//出隊(duì)時(shí)喚醒其他線程
this.notifyAll();
return value;
}
}
}
public class Exe_02 {
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue queue=new MyBlockingQueue();
//往阻塞隊(duì)列添加三個(gè)元素
queue.put(1);
queue.put(2);
queue.put(3);
System.out.println("添加了3個(gè)元素");
//再添加第四個(gè)
//queue.put(4);
//System.out.println("添加了4個(gè)元素");
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println("take了三個(gè)元素");
}
}
運(yùn)行結(jié)果:
1.5、實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
代碼示例:
public class Exe_03 {
//用兩個(gè)線程生產(chǎn)者消費(fèi)者模型
public static void main(String[] args) {
//創(chuàng)建一個(gè)阻塞隊(duì)列,表示交易場(chǎng)所
MyBlockingQueue queue=new MyBlockingQueue();
//創(chuàng)建生產(chǎn)者線程
Thread producer=new Thread(() ->{
int num=0;
while(true){
try {
queue.put(num);
System.out.println("生產(chǎn)了元素:"+num);
num++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"生產(chǎn)者");
//啟動(dòng)線程
producer.start();
//創(chuàng)建消費(fèi)者線程
Thread consumer=new Thread(() ->{
while(true) {
try {
Integer value = queue.take();
//睡眠一會(huì)
Thread.sleep(1000);
System.out.println("消費(fèi)了了元素" + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消費(fèi)者");
//啟動(dòng)線程
consumer.start();
}
}
?運(yùn)行結(jié)果:
現(xiàn)象就是 ,生產(chǎn)者都把隊(duì)列填滿后,開(kāi)始阻塞,等消費(fèi)者一點(diǎn)一點(diǎn)消費(fèi),生產(chǎn)者一點(diǎn)一點(diǎn)生產(chǎn)。
調(diào)用wait()解決虛假喚醒問(wèn)題?
?
更新代碼:?文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-502089.html
?
public class Exe_03 {
//用兩個(gè)線程生產(chǎn)者消費(fèi)者模型
public static void main(String[] args) {
//創(chuàng)建一個(gè)阻塞隊(duì)列,表示交易場(chǎng)所
MyBlockingQueue queue=new MyBlockingQueue();
//創(chuàng)建生產(chǎn)者線程
Thread producer=new Thread(() ->{
int num=0;
while(true){
try {
queue.put(num);
System.out.println("生產(chǎn)了元素:"+num);
num++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"生產(chǎn)者");
//啟動(dòng)線程
producer.start();
//創(chuàng)建消費(fèi)者線程
Thread consumer=new Thread(() ->{
while(true) {
try {
Integer value = queue.take();
//睡眠一會(huì)
Thread.sleep(1000);
System.out.println("消費(fèi)了了元素" + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消費(fèi)者");
//啟動(dòng)線程
consumer.start();
}
}
到了這里,關(guān)于阻塞隊(duì)列(消息隊(duì)列)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!