多線程案例
1、案例一:線程安全的單例模式
單例模式
單例模式是設(shè)計(jì)模式的一種
什么是設(shè)計(jì)模式?
設(shè)計(jì)模式好比象棋中的 “棋譜”,紅方當(dāng)頭炮,黑方馬來跳,針對紅方的一些走法,黑方應(yīng)招的時候有一些固定的套路,按照套路來走局勢就不會吃虧,也就發(fā)明了一組"棋譜",稱為設(shè)計(jì)模式軟件開發(fā)中也有很多常見的 “問題場景”,針對一些典型的場景,給出了一些典型的解決方案
有兩個設(shè)計(jì)模式是非常常見的
其一是單例模式,其二是工廠模式
單例模式 => 單個 實(shí)例 (對象)
在有些場景中,有的特定的類,只能創(chuàng)建出一個實(shí)例,不應(yīng)該創(chuàng)建多個實(shí)例
單例模式能保證某個類在程序中只存在唯一一份實(shí)例, 而不會創(chuàng)建出多個實(shí)例,這種單例模式,在實(shí)際開發(fā)中是非常常見,也非常有用的,開發(fā)中的很多 “概念” 天然就是單例,JDBC,DataSource,這樣的對象,就應(yīng)該是單例的
Java 里實(shí)現(xiàn)單例模式的方式有很多,單例模式的兩種典型實(shí)現(xiàn):
- 餓漢模式
- 懶漢模式
舉例:洗碗
1.中午這頓飯,使用了4個碗,吃完之后,立即把這4個碗給洗了~~[餓漢]
⒉中午這頓飯,使用了4個碗,吃完之后,先不洗。晚上這頓,只需要2個碗,然后就只洗2個即可~~[懶漢]
第二種是更加高效的操作,—般是褒義詞 (在計(jì)算機(jī)中提高效率)
餓漢的單例模式,是比較著急地去進(jìn)行創(chuàng)建實(shí)例
懶漢的單例模式,是不太著急地去創(chuàng)建實(shí)例,只是在用的時候,才真正創(chuàng)建
1.1、餓漢模式
private static Singleton instance;
注意:
-
類里面使用 static 修飾的成員,應(yīng)該叫做 “類成員” => “類屬性 / 類方法”,相當(dāng)于這個屬性對應(yīng)的內(nèi)存空間在類對象里面
不加 static 修飾的成員,叫做 “實(shí)例成員” => “實(shí)例屬性 / 實(shí)例方法”靜態(tài)變量 屬于類,存儲在方法區(qū),隨著的類加載而加載,
成員變量 屬于對象,存儲在堆中,隨著對象的創(chuàng)建而創(chuàng)建-
static 是讓當(dāng)前 instance 屬性是類屬性了
-
一個類對象在一個 Java 進(jìn)程中是唯一實(shí)例的 (JVM保證的),類屬性是長在類對象上的,進(jìn)一步的也就保證了類的 static 成員也是只有一份的
-
-
類對象 != 對象
類:就相當(dāng)于實(shí)例的模板,基于模板可以創(chuàng)建出很多的對象來
對象(實(shí)例)- java 代碼中的每個類,都會在編譯完成后得到 .class文件,類對象,就是 .class 文件
JVM 運(yùn)行時就會加載這個 .class 文件讀取其中的二進(jìn)制指令,并解析,在內(nèi)存中構(gòu)造出對應(yīng)的類對象 (類加載),形如 Singleton.class) - 類對象里就有 .class 文件中的一切信息
包括:類名是啥,類里有哪些屬性,每個屬性叫啥名字,每個屬性叫啥類型,每個屬性是 public private…
基于這些信息,才能實(shí)現(xiàn)反射
- java 代碼中的每個類,都會在編譯完成后得到 .class文件,類對象,就是 .class 文件
// 通過 Singleton 這個類來實(shí)現(xiàn)單例模式,保證 Singleton 這個類只有唯一實(shí)例
class Singleton {
// 1.使用 static 創(chuàng)建一個實(shí)例,并且立即進(jìn)行實(shí)例化
// 這個 instance 對應(yīng)的實(shí)例,就是該類的唯一實(shí)例
private static Singleton instance = new Singleton();
// 2.提供一個方法,讓外面能夠拿到唯一實(shí)例
public static Singleton getInstance() {
return instance;
}
// 3.為了防止程序猿在其他地方不小心地 new 這個 Singleton,就可以把構(gòu)造方法設(shè)為 private
// 把構(gòu)造方法設(shè)為 private.在類外面,就無法通過 new的方式來創(chuàng)建這個 Singleton實(shí)例了!
private Singleton() {};
}
public class demo1 {
public static void main(String[] args) {
Singleton instance = Singleton.getInstance();
Singleton instance2 = Singleton.getInstance();
System.out.println(instance == instance2); // true 兩個引用相同
}
}
針對這個唯一實(shí)例的初始化,比較著急,類加載階段,就會直接創(chuàng)建實(shí)例
(程序中用到了這個類,就會立即加載)
餓漢模式中 getlnstance,僅僅是讀取了變量的內(nèi)容
如果多個線程只是讀同一個變量,不修改,此時仍然是線程安全的
1.2、懶漢模式 - 單線程
class Singleton2 {
// 1.就不是立即就初始化實(shí)例.
private static Singleton2 instance = null;
// 2.把構(gòu)造方法設(shè)為 private
private Singleton2() {}
// 3.提供一個方法來獲取到上述單例的實(shí)例
// 只有當(dāng)真正需要用到這個實(shí)例的時候,才會真正去創(chuàng)建這個實(shí)例
public static Singleton2 getInstance() {
if (instance == null) {
instance = new Singleton2();
}
return instance;
}
}
只有在真正使用到 getInstance 的時候才會真的創(chuàng)建實(shí)例
一個典型的案例:notepad
這樣的程序,在打開大文件的時候是很慢的 (你要打開一個1G大小的文件,此時 notepad 就會嘗試把這 1G 的所有內(nèi)容都讀到內(nèi)存中) [餓漢]
像一些其他的程序,在打開大文件的時候就有優(yōu)化 (要打開 1G 的文件,但是只先加載這—個屏幕中能顯示出來的部分) [懶漢]
1.3、懶漢模式 - 線程安全
真正要解決的問題,是實(shí)現(xiàn)一個線程安全的單例模式
線程安全不安全,具體指的是多線程環(huán)境下,并發(fā)的調(diào)用 getInstance 方法,是否可能存在 bug
——懶漢模式 與 餓漢模式 在多線程環(huán)境下,是否線程安全?
-
餓漢模式這里,多線程調(diào)用,只是涉及到"讀操作"
-
懶漢模式中,包含讀操作和修改操作,存在線程安全問題
上述羅列出了一種可能的排序情況,實(shí)際情況是有很多種
通過上述分析,就可以看出,當(dāng)前這個代碼中是存在bug,可能導(dǎo)致實(shí)例被創(chuàng)建出多份來
如何保證懶漢模式的線程安全呢?加鎖!
可不是說,代碼中有了 synchronized 就—定線程安全,synchronized 加的位置也得正確,不能隨便寫
本質(zhì)是讀,比較,寫,這三個操作不是原子的。這就導(dǎo)致了 t2 讀到的值可能是 t1 還沒來得及寫的(臟讀),導(dǎo)致多次 new;所以要把鎖加在外面,此時才能保證 讀操作 和 修改操作 是一個整體
使用這里的類對象作為鎖對象
(類對象在一個程序中只有唯——份,就能保證多個線程調(diào)用 getInstance 的時候都是針對同一個對象進(jìn)行的加鎖)
public static Singleton2 getInstance() {
synchronized (Singleton.class) { // 類對象作為鎖對象
if (instance == null) {
instance = new Singleton2();
}
}
return instance;
}
1.4、懶漢模式 - 鎖競爭
當(dāng)前雖然加鎖之后,線程安全問題得到解決了,但是又有了新的問題:
對于剛才這個懶漢模式的代碼來說, 線程不安全,是發(fā)生在 instance 被初始化之前的,未初始化的時候,多線程調(diào)用 getinstance,就可能同時涉及到讀和修改,但是一旦 instance 被初始化之后,后續(xù)調(diào)用 getlnstance,此時 instance 的值一定是非空的,if 判斷不成立,也就線程安全了,因此就會直接觸發(fā) return,getlnstance 就只剩下兩個讀操作,相當(dāng)于一個是比較操作,一個是返回操作,這兩個都是讀操作
而按照上述的加鎖方式,無論代碼是初始化之后,還是初始化之前。加鎖是有開銷的,每次調(diào)用 getinstance 都會進(jìn)行加鎖,也就意味著即使是初始化之后 (已經(jīng)線程安全了),但是仍然存在大量的鎖競爭 加鎖確實(shí)能讓代碼保證線程安全,也付出了代價 (程序的速度就慢了)
所以為啥不推薦使用 vector hashtable ?? 就是因?yàn)檫@倆類里面就是在無腦加鎖
改進(jìn)方案,在加鎖這里再加上一層條件判定即可,對象還沒創(chuàng)建,才進(jìn)行加鎖;對象創(chuàng)建過了,就不再加鎖了,
條件就是當(dāng)前是否已經(jīng)初始化完成 (instance == null)
class Singleton2 {
// 1.就不是立即就初始化實(shí)例.
private static Singleton2 instance = null;
// 2.把構(gòu)造方法設(shè)為 private
private Singleton2() {}
// 3.提供一個方法來獲取到上述單例的實(shí)例
// 只有當(dāng)真正需要用到這個實(shí)例的時候,才會真正去創(chuàng)建這個實(shí)例
public static Singleton2 getInstance() {
if (instance == null) {
synchronized (Singleton.class) { // 類對象作為鎖對象
if (instance == null) {
instance = new Singleton2();
}
}
}
return instance;
}
}
這倆條件—模一樣,只是一個美麗的巧合而已,這倆條件起到的效果 / 預(yù)期的目的是完全不—樣的
上面的條件判定的是是否要加鎖
下面的條件判定的是是否要創(chuàng)建實(shí)例
碰巧這兩個目的都是判定 instance 是否為 null
在這個代碼中,看起來兩個—樣的 if 條件是相鄰的,但是實(shí)際上這兩個條件的執(zhí)行時機(jī)是差別很大的!
加鎖可能導(dǎo)致線程阻塞,當(dāng)執(zhí)行到鎖結(jié)束之后,執(zhí)行到第二個 if 的時候,第二個 if 和第一個 if 之間可能已經(jīng)隔了很久的時間,滄海桑田。程序的運(yùn)行內(nèi)部的狀態(tài),這些變量的值,都可能已經(jīng)發(fā)生很大改變了。 如外層條件是 10:16 執(zhí)行的,里層條件可能是 10:30 執(zhí)行的,此時 instance 可能已經(jīng)被其他線程給修改了。
如果去掉了里層的 if 就變成了剛才那個典型的錯誤代碼,加鎖沒有把讀+修改這操作進(jìn)行打包
public static Singleton2 getInstance() {
if (instance == null) { // 判定的是是否要加鎖
synchronized (Singleton.class) {
instance = new Singleton2();
}
}
return instance;
}
1.5、懶漢模式 - 內(nèi)存可見性 指令重排序
當(dāng)前這個代碼中還存在一個重要的問題
如果多個線程,都去調(diào)用這里的 getlnstance
就會造成大量的讀 instance 內(nèi)存的操作 => 可能會讓編譯器把這個讀內(nèi)存操作優(yōu)化成讀寄存器操作
—旦這里觸發(fā)了優(yōu)化,后續(xù)如果第一個線程已經(jīng)完成了針對 instance 的修改,那么緊接著后面的線程都感知不到這個修改,仍然把 instance 當(dāng)成 null
另外,還會涉及到指令重排序問題!!
instance = new Singleton();
拆分成三個步驟:
1.申請內(nèi)存空間
2.調(diào)用構(gòu)造方法,把這個內(nèi)存空間初始化成一個合理的對象
3.把內(nèi)存空間的地址賦值給 instance 引用
正常情況下,是按照 123 這個順序來執(zhí)行的
編譯器還有一手操作,指令重排序:為了提高程序效率,調(diào)整代碼執(zhí)行順序
123 這個順序就可能變成 132
如果是單線程,123 和 132 沒有本質(zhì)區(qū)別
例如食堂阿姨打飯,1 是拿盤子,2 是裝飯,3 是把盤子給我。此時,就是先把盤子給我,再裝飯
但是多線程環(huán)境下,就會有問題了!!!
假設(shè) t1 是按照 132 的步驟執(zhí)行的
t1 執(zhí)行到 13 之后,執(zhí)行 2 之前,被切出 cpu,t2 來執(zhí)行
(當(dāng) t1 執(zhí)行完 3 之后,t2 看起來,此處的引用就非空了),此時此刻,t2 就相當(dāng)于直接返回了 instance 引用,并且可能會嘗試使用引用中的屬性
但是由于 t1 中的 2(裝飯) 操作還沒執(zhí)行完呢,t2 拿到的是非法的對象,還沒構(gòu)造完成的不完整的對象
解決方法:給 instance 加上 volatile 即可
// 這個代碼是完全體的線程安全單例模式
class Singleton2 {
// 1.就不是立即就初始化實(shí)例.
private static volatile Singleton2 instance = null;
// 2.把構(gòu)造方法設(shè)為 private
private Singleton2() {}
// 3.提供一個方法來獲取到上述單例的實(shí)例
// 只有當(dāng)真正需要用到這個實(shí)例的時候,才會真正去創(chuàng)建這個實(shí)例
public static Singleton2 getInstance() {
if (instance == null) { // 判定的是是否要加鎖
synchronized (Singleton.class) { // 類對象作為鎖對象
if (instance == null) { // 判定的是是否要創(chuàng)建實(shí)例
instance = new Singleton2();
}
}
}
return instance;
}
}
2、案例二:阻塞隊(duì)列
2.1、生產(chǎn)者消費(fèi)者模型
隊(duì)列先進(jìn)先出
阻塞隊(duì)列同樣也是一個符合先進(jìn)先出規(guī)則的特殊隊(duì)列,相比于普通隊(duì)列,阻塞隊(duì)列又有一些其他方面的功能!
1、線程安全
2、產(chǎn)生阻塞效果
1). 如果隊(duì)列為空,執(zhí)行出隊(duì)列操作,就會出現(xiàn)阻塞,阻塞到另一個線程往隊(duì)列里添加元素(隊(duì)列不為空)為止
2). 如果隊(duì)列為滿,執(zhí)行入隊(duì)列操作,也會出現(xiàn)阻塞,阻塞到另一個線程從隊(duì)列里取走元素(隊(duì)列不為滿)為止
消息隊(duì)列,也是特殊的隊(duì)列,相當(dāng)于是在阻塞隊(duì)列的基礎(chǔ)上,加上了個 "消息的類型”,按照制定類別進(jìn)行先進(jìn)先出
此時咱們談到的這個消息隊(duì)列, 仍然是一個 “數(shù)據(jù)結(jié)構(gòu)”
基于上述特性,就可以實(shí)現(xiàn) “生產(chǎn)者消費(fèi)者模型”
此處的阻塞隊(duì)列就可以作為生產(chǎn)者消費(fèi)者模型中的交易場所
生產(chǎn)者消費(fèi)者模型,是實(shí)際開發(fā)中非常有用的一種多線程開發(fā)手段!尤其是在服務(wù)器開發(fā)的場景中
假設(shè),有兩個服務(wù)器, AB,A作為入口服務(wù)器直接接收用戶的網(wǎng)絡(luò)請求,B作為應(yīng)用服務(wù)器,來給A提供一些數(shù)據(jù)
優(yōu)點(diǎn)1:解耦合
實(shí)現(xiàn)了發(fā)送發(fā)和接受方之間的解耦
——開發(fā)中典型的場景:服務(wù)器之間的相互調(diào)用
客戶端發(fā)送一個充值請求給 A 服務(wù)器,此時 A 把請求轉(zhuǎn)發(fā)給 B 處理,B 處理完了把結(jié)果反饋給 A,此時就可以視為是 “A 調(diào)用了 B”,
如果不使用生產(chǎn)者消費(fèi)者模型
上述場景中,A 和 B 之間的耦合性是比較高的! A 要調(diào)用 B,A 務(wù)必要知道 B的存在,如果 B 掛了,很容易引起 A 的 bug !!!(在開發(fā) A 代碼的時候就得充分了解到 B 提供的一些接口,開發(fā) B 代碼的時候也得充分了解到 A 是怎么調(diào)用的)
另外,如果要是再加一個 C 服務(wù)器,此時也需要對 A 修改不少代碼
因此就需要針對 A 重新修改代碼,重新測試,重新發(fā)布,重新部署,非常麻煩了
針對上述場景,使用生產(chǎn)者消費(fèi)者模型,就可以有效的降低耦合
對于請求:A是生產(chǎn)者,B是消費(fèi)者
對于響應(yīng):A是消費(fèi)者,B是生產(chǎn)者
阻塞隊(duì)列都是作為交易場所,隊(duì)列是不變
A 不需要認(rèn)識 B,只需要關(guān)注如何和隊(duì)列交互 (A 的代碼中,沒有任何一行代碼和 B 相關(guān))
B 不需要認(rèn)識 A,也只需要關(guān)注如何和隊(duì)列交互 (B 的代碼中,也沒有任何一行代碼和 A 相關(guān))
如果 B 掛了,對于 A 沒有任何影響,因?yàn)殛?duì)列還好著,A 仍然可以給隊(duì)列插入元素,如果隊(duì)列滿,就先阻塞就好了,
如果 A 掛了,也對于 B 沒有影響,因?yàn)殛?duì)列還好著,B 仍然可以從隊(duì)列取元素,如果隊(duì)列空,也就先阻塞就好了
A B 任何一方掛了不會對對方造成影響!!!
新增一個 C 來作為消費(fèi)者,對于 A 來說,也完全感知不到…
優(yōu)點(diǎn)2:削峰填谷
能夠?qū)τ谡埱筮M(jìn)行 “削峰填谷”,保證系統(tǒng)的穩(wěn)定性
——三峽大壩,起到的效果,就是 “削峰填谷”
到了雨季,水流量就會很大,三峽大壩關(guān)閘蓄水,承擔(dān)了上游的沖擊,保護(hù)下游水流量不是太大,不至于出現(xiàn)洪災(zāi)——削峰
到了早季,水流量很小,三峽大壩就開閘放水,給下游提供更充分的水源,避免出現(xiàn)干旱災(zāi)害——填谷
什么時候上游漲水,真的是難以預(yù)測,防患于未然
上游,就是用戶發(fā)送的請求。下游就是一些執(zhí)行具體業(yè)務(wù)的服務(wù)器。
用戶發(fā)多少請求?不可控的,有的時候,請求多,有的時候請求少…
——未使用生產(chǎn)者消費(fèi)者模型:
未使用生產(chǎn)者消費(fèi)者模型的時候,如果請求量突然暴漲 (不可控)
A暴漲 => B暴漲
A 作為入口服務(wù)器,計(jì)算量很輕,請求暴漲,問題不大
B 作為應(yīng)用服務(wù)器,計(jì)算量可能很大,需要的系統(tǒng)資源也更多,如果請求更多了,需要的資源進(jìn)一步增加,如果主機(jī)的硬件不夠,可能程序就掛了
——使用生產(chǎn)者消費(fèi)者模型:
A 請求暴漲 => 阻塞隊(duì)列的請求暴漲
由于阻塞隊(duì)列沒啥計(jì)算量,就只是單純的存?zhèn)€數(shù)據(jù),就能抗住更大的壓力
B 這邊仍然按照原來的速度來消費(fèi)數(shù)據(jù),不會因?yàn)锳的暴漲而引起暴漲,B就被保護(hù)的很好,就不會因?yàn)檫@種請求的波動而引起崩潰
“削峰”:這種峰值很多時候不是持續(xù)的,就一陣,過去了就又恢復(fù)了
“填谷”:B 仍然是按照原有的頻率來處理之前積壓的數(shù)據(jù)
實(shí)際開發(fā)中使用到的 “阻塞隊(duì)列” 并不是一個簡單的數(shù)據(jù)結(jié)構(gòu)了,而是一個 / 一組專門的服務(wù)器程序,并且它提供的功能也不僅僅是阻塞隊(duì)列的功能,還會在這基礎(chǔ)之上提供更多的功能 (對于數(shù)據(jù)持久化存儲,支持多個數(shù)據(jù)通道,支持多節(jié)點(diǎn)容災(zāi)冗余備份,支持管理面板,方便配置參數(shù).……)
這樣的隊(duì)列又起了個新的名字,"消息隊(duì)列” (未來開發(fā)中廣泛使用到的組件)
kafka 就是業(yè)界一個比較主流的消息隊(duì)列,消息隊(duì)列的實(shí)現(xiàn),有很多種,核心功能都差不多
2.2、實(shí)現(xiàn)阻塞隊(duì)列
學(xué)會使用 Java 標(biāo)準(zhǔn)庫中的阻塞隊(duì)列,基于這個內(nèi)置的阻塞隊(duì)列,實(shí)現(xiàn)一個簡單的生產(chǎn)者消費(fèi)者模型
再自己**實(shí)現(xiàn)一個簡單的阻塞隊(duì)列 **(為了更好地理解阻塞隊(duì)列的原理,多線程,尤其是鎖操作)
標(biāo)準(zhǔn)庫中的阻塞隊(duì)列 BlockingQueue
在 Java 標(biāo)準(zhǔn)庫中內(nèi)置了阻塞隊(duì)列,如果我們需要在一些程序中使用阻塞隊(duì)列,直接使用標(biāo)準(zhǔn)庫中的即可
-
BlockingQueue 是一個接口,真正實(shí)現(xiàn)的類是 LinkedBlockingQueue
-
Queue 提供的方法有三個:入隊(duì)列 offer。出隊(duì)列 poll。取隊(duì)首元素 peek
阻塞隊(duì)列主要方法是兩個:入隊(duì)列 put,出隊(duì)列 take
-
BlockingQueue 也有 offer, poll, peek 等方法,但是這些方法不帶有阻塞特性
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class demo3 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
blockingQueue.put("hello");
String s1 = blockingQueue.take();
System.out.println(s1);
blockingQueue.take();
String s2 = blockingQueue.take();
System.out.println(s2);
}
}
取出 “hello”,隊(duì)列為空,此時再次取元素,就會進(jìn)入阻塞,等待其他線程往隊(duì)列中添加元素
生產(chǎn)者消費(fèi)者模型
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ThreadDemo1 {
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
// 創(chuàng)建兩個線程,作為生產(chǎn)者和消費(fèi)者
Thread customer = new Thread(() -> {
while (true) {
try {
Integer result = blockingQueue.take();
System.out.println("消費(fèi)元素:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
Thread producer = new Thread(() -> {
int count = 0;
while (true) {
try {
blockingQueue.put(count);
System.out.println("生產(chǎn)元素:" + count);
count++;
Thread.sleep(500); // 每500毫秒生產(chǎn)一個
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
}
}
阻塞隊(duì)列 - 單線程
要實(shí)現(xiàn)一個阻塞隊(duì)列,需要先寫一個普通的隊(duì)列,再加上線程安全,再加上阻塞
隊(duì)列可以基于數(shù)組實(shí)現(xiàn),也可以基于鏈表實(shí)現(xiàn)
——鏈表:很容易進(jìn)行頭刪 / 尾插
鏈表的頭刪操作,時間復(fù)雜度是 O(1)
鏈表的尾插操作,時間復(fù)雜度是 “可以是 O(1)"
用一個額外的引用,記錄當(dāng)前的尾結(jié)點(diǎn)
——數(shù)組:循環(huán)隊(duì)列
[head, tail) 都指向下標(biāo)為 0
入隊(duì)列,把新元素放到 tail 位置上,并且 tail++
出隊(duì)列,把 head 位置的元素返回出去,并且 head++
當(dāng) head / tail 到達(dá)數(shù)組末尾之后,就需要從頭開始,重新循環(huán)
實(shí)現(xiàn)循環(huán)隊(duì)列的時候,有一個重要的問題,如何區(qū)分,是空隊(duì)列還是滿隊(duì)列?
如果不加額外限制,此時隊(duì)列空或者滿都是 head 和 tail 重合
-
浪費(fèi)一個格子,head == tail 認(rèn)為是空
head == tail+1 認(rèn)為是滿 -
額外創(chuàng)建一個變量,size 記錄元素的個數(shù),size == 0 空
size == arr.length 滿
class MyBlockingQueue {
// 保存數(shù)據(jù)的本體
private int[] items = new int[1000];
// 隊(duì)首下標(biāo)
private int head = 0;
// 隊(duì)尾下標(biāo)
private int tail = 0;
// 有效元素個數(shù)
private int size = 0;
// 入隊(duì)列
public void put(int value) {
// 1、
if (size == items.length) {
// 隊(duì)列滿了,暫時先直接返回
return;
}
// 2、把新的元素放入 tail 位置
items[tail] = value;
tail++;
// 3、處理 tail 到達(dá)數(shù)組末尾的情況
if (tail >= items.length) { // 判定 + 賦值 (雖然是兩個操作,兩個操作都是高效操作)
tail = 0;
}
// tail = tail % data.length; // 代碼可讀性差,除法速度不如比較,不利于開發(fā)效率也不利于運(yùn)行效率
// 4、插入完成,修改元素個數(shù)
size++;
}
// 出隊(duì)列
public Integer take() {
// 1、
if (size == 0) {
// 如果隊(duì)列為空,返回一個非法值
return null;
}
// 2、取出 head 位置的元素
int ret = items[head];
head++;
// 3、head 到末尾 重新等于 0
if (head >= items.length) {
head = 0;
}
// 4、數(shù)組元素個數(shù)--
size--;
return ret;
}
}
public class TestDemo {
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue();
queue.put(1);
queue.put(2);
queue.put(3);
queue.put(4);
System.out.println(queue.take()); // 1
System.out.println(queue.take()); // 2
System.out.println(queue.take()); // 3
System.out.println(queue.take()); // 4
}
}
阻塞隊(duì)列 - 線程安全
當(dāng)前已經(jīng)完成了普通隊(duì)列的實(shí)現(xiàn),加上阻塞功能,阻塞功能意味著,隊(duì)列要在多線程環(huán)境下使用 。保證多線程環(huán)境下,調(diào)用這里的 put 和 take 沒有問題的,
put 和 take 里面的每一行代碼都是在操作公共的變量。既然如此,直接就給整個方法加鎖即可
(加上 synchronized
已經(jīng)是線程安全的了)
// 入隊(duì)列
public void put(int value) {
// 此處是把 synchronized 包裹了方法里的所有代碼,其實(shí) synchronized 加到方法上,也是一樣的效果
synchronized (this) { // 針對同一個 MyBlockingQueue,進(jìn)行 put,take 操作時,會產(chǎn)生鎖競爭
if (size == items.length) {
return;
}
items[tail] = value;
tail++;
if (tail >= items.length) {
tail = 0;
}
size++;
}
}
// 出隊(duì)列
public Integer take() {
int ret = 0;
synchronized (this) {
if (size == 0) {
return null;
}
ret = items[head];
head++;
if (head >= items.length) {
head = 0;
}
size--;
}
return ret;
}
阻塞隊(duì)列 - 阻塞
接下來,實(shí)現(xiàn)阻塞效果
關(guān)鍵要點(diǎn),使用 wait 和 notify
機(jī)制
對于 put 來說,阻塞條件,就是隊(duì)列為滿,對于 take 來說,阻塞條件,就是隊(duì)列為空
針對哪個對象加鎖就使用哪個對象 wait, 如果是針對 this 加鎖,就 this.wait
put 中的 wait 要由 take 來喚醒,只要 take 成功了一個元素,就隊(duì)列不滿了,就可以進(jìn)行喚醒了
對于 take 中的等待,條件是隊(duì)列為空,隊(duì)列不為空,也就是 put 成功之后,就來喚醒
當(dāng)前代碼中,put 和 take 兩種操作不會同時 wait (等待條件是截然不同的,一個是為空,一個是為滿)
如果有人在等待,notify 能喚醒,如果沒人等待,notify 沒有任何副作用
notify 只能喚醒隨機(jī)的一個等待的線程,不能做到精準(zhǔn)
要想精準(zhǔn),就必須使用不同的鎖對象
想喚醒 t1,就 o1.notify,讓 t1 進(jìn)行 o1.wait。想喚醒 t2,就 o2.notify,讓 t2 進(jìn)行 o2.wait
當(dāng) wait 被喚醒的時候,此時 if 的條件,一定就不成立了嘛?? 具體來說,put 中的 wait 被喚醒,要求,隊(duì)列不滿
但是 wait 被喚醒了之后,隊(duì)列一定是不滿的嘛?
注意,咱們當(dāng)前代碼中,確實(shí)不會出現(xiàn)這種情況,當(dāng)前代碼一定是取元素成功才喚醒,每次取元素都會喚醒
但是穩(wěn)妥起見,最好的辦法,是 wait 返回之后再次判定一下,看此時的條件是不是具備了!!
將 if 改為 while,標(biāo)準(zhǔn)庫就是建議這么寫的
while (size == items.length) {
// 隊(duì)列滿了,暫時先直接返回
// return;
this.wait();
}
while (size == 0) {
// 如果隊(duì)列為空,返回一個非法值
// return null;
this.wait();
}
代碼:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
// 自己寫的阻塞隊(duì)列,此處不考慮泛型,直接使用 int 來表示元素類型了
class MyBlockingQueue {
// 保存數(shù)據(jù)的本體
private int[] items = new int[1000];
// 隊(duì)首下標(biāo)
private int head = 0;
// 隊(duì)尾下標(biāo)
private int tail = 0;
// 有效元素個數(shù)
private int size = 0;
// 入隊(duì)列
public void put(int value) throws InterruptedException {
synchronized (this) { // 針對同一個 MyBlockingQueue,進(jìn)行 put,take 操作時,會產(chǎn)生鎖競爭
while (size == items.length) {
// 隊(duì)列滿了,暫時先直接返回
// return;
this.wait();
}
// 2、把新的元素放入 tail 位置
items[tail] = value;
tail++;
// 3、處理 tail 到達(dá)數(shù)組末尾的情況
if (tail >= items.length) { // 判定 + 賦值 (雖然是兩個操作,兩個操作都是高效操作)
tail = 0;
}
// tail = tail % data.length; // 代碼可讀性差,除法速度不如比較,不利于開發(fā)效率也不利于運(yùn)行效率
// 4、插入完成,修改元素個數(shù)
size++;
// 如果入隊(duì)列成功,則隊(duì)列非空,喚醒 take 中的 wait
this.notify();
}
}
// 出隊(duì)列
public Integer take() throws InterruptedException {
int ret = 0;
synchronized (this) {
while (size == 0) {
// 如果隊(duì)列為空,返回一個非法值
// return null;
this.wait();
}
// 2、取出 head 位置的元素
ret = items[head];
head++;
// 3、head 到末尾 重新等于 0
if (head >= items.length) {
head = 0;
}
// 4、數(shù)組元素個數(shù)--
size--;
// take 成后,喚醒 put 中的 wait
this.notify();
}
return ret;
}
}
public class ThreadDemo2 {
public static void main(String[] args) {
// 生產(chǎn)者消費(fèi)者模型
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
// 創(chuàng)建兩個線程,作為生產(chǎn)者和消費(fèi)者
Thread customer = new Thread(() -> {
while (true) {
try {
Integer result = blockingQueue.take();
System.out.println("消費(fèi)元素:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
Thread producer = new Thread(() -> {
int count = 0;
while (true) {
try {
blockingQueue.put(count);
System.out.println("生產(chǎn)元素:" + count);
count++;
Thread.sleep(500); // 每500毫秒生產(chǎn)一個
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
}
}
3、案例三:定時器
3.1、標(biāo)準(zhǔn)庫中的定時器 Timer
定時器也是軟件開發(fā)中的一個重要組件,類似于一個 “鬧鐘”,達(dá)到一個設(shè)定的時間之后,就喚醒并執(zhí)行之前設(shè)定好的任務(wù)
生活中鬧鐘,有兩種風(fēng)格:1.指定特定時刻,提醒。2.指定特定時間段之后,提醒
這里的定時器,不是提醒,是執(zhí)行一個實(shí)現(xiàn)準(zhǔn)備好的方法/代碼
定時器是一種實(shí)際開發(fā)中非常常用的組件
比如網(wǎng)絡(luò)通信中,很容易出現(xiàn) “連不上” 的情況,不能一直等,就可以使用定時器來進(jìn)行 “止損”,如果對方 500ms 內(nèi)沒有返回?cái)?shù)據(jù),則斷開連接嘗試重連
比如一個 Map,希望里面的某個 key 在 3s 之后過期(自動刪除)類似于這樣的場景就需要用到定時器
join
(指定超時時間),sleep
(休眠指定時間,是基于系統(tǒng)內(nèi)部的定時器,來實(shí)現(xiàn)的)
先介紹標(biāo)準(zhǔn)庫的定時器用法,然后再看看如何自己實(shí)現(xiàn)一個定時器
標(biāo)準(zhǔn)庫中提供了一個 Timer 類,Timer 類的核心方法為 schedule (安排),這個方法的效果是給定時器,注冊一個任務(wù),任務(wù)不會立即執(zhí)行,而是在指定時間進(jìn)行執(zhí)行
schedule 包含兩個參數(shù),第一個參數(shù)指定即將要執(zhí)行的任務(wù)代碼 (Runnable),第二個參數(shù)指定多長時間之后執(zhí)行 (單位為毫秒)
import java.util.Timer;
import java.util.TimerTask;
public class demo5 {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello time");
}
}, 3000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello time2");
}
}, 2000);
System.out.println("main");
}
}
運(yùn)行結(jié)果:
首先打?。簃ain
幾秒后 打印:hello time2
然后打?。篽ello time
但是程序沒有結(jié)束
Timer 內(nèi)部是有專門的線程,來負(fù)責(zé)執(zhí)行注冊的任務(wù)的
Timer 內(nèi)部都需要:
- 管理很多的任務(wù)
- 執(zhí)行時間到了的任務(wù)
自己實(shí)現(xiàn)一個定時器:一個定時器是可以注冊 N 個任務(wù)的,N 個任務(wù)會按照最初約定的時間,按順序執(zhí)行
1). 有一個掃描線程,負(fù)責(zé)判定時間到/執(zhí)行任務(wù) (單獨(dú)在定時器內(nèi)部,搞個線程,讓這個線程周期性地掃描,判定任務(wù)是否是到時間了,如果到時間了,就執(zhí)行,沒到時間就再等等)
2). 還要有一個數(shù)據(jù)結(jié)構(gòu)(優(yōu)先級隊(duì)列),來保存所有被注冊的任務(wù)
3.2、描述任務(wù)
創(chuàng)建一個專門的類來表示一個定時器中的任務(wù) (TimerTask)
隊(duì)列中存放的任務(wù)就是 Runnable,Runnable 只是描述了任務(wù)內(nèi)容,還需要描述任務(wù)什么時候被執(zhí)行
// 創(chuàng)建一個類,表示一個任務(wù)
class MyTask {
// 任務(wù)具體要做什么
private Runnable runnable;
// 任務(wù)什么時候執(zhí)行 (任務(wù)要執(zhí)行的毫秒級時間戳)
private long time;
public MyTask(Runnable runnable, long time) {
this.runnable = runnable;
this.time = time;
}
// 獲取當(dāng)前任務(wù)時間
public long getTime() {
return time;
}
// 執(zhí)行任務(wù)
public void run() {
runnable.run();
}
}
3.2、組織任務(wù)
使用一定的數(shù)據(jù)結(jié)構(gòu)把一些任務(wù)給放到一起,通過一定的數(shù)據(jù)結(jié)構(gòu)來組織
假設(shè)現(xiàn)在有多個任務(wù)過來了—個小時之后,去做作業(yè),三個小時之后,去上課,10分鐘之后,去休息—會
安排任務(wù)的時候,這些任務(wù)的順序是無序的,但是執(zhí)行任務(wù)的時候,這就不是無序的了,按照時間先后來執(zhí)行!
咱們的需求就是,能夠快速找到所有任務(wù)中,時間最小的任務(wù)
此時我們發(fā)現(xiàn)可以用堆,在標(biāo)準(zhǔn)庫中,有一個專門的數(shù)據(jù)結(jié)構(gòu) PriorityQueue
咱們這里的每個任務(wù)都是帶個"時間"多久之后執(zhí)行,一定是時間越靠前,就先執(zhí)行
按照時間小的,作為優(yōu)先級高
此時隊(duì)首元素,就是整個隊(duì)列中,最先要執(zhí)行的任務(wù)
雖然隊(duì)列中的元素順序,不能完全確定,但是可以知道,隊(duì)首元素,一定是時間最靠前的
此時,掃描線程,只需要掃一下隊(duì)首元素即可,不必遍歷整個隊(duì)列
private PriorityQueue<> queue = new PriorityQueue<>();
但是此處的優(yōu)先級隊(duì)列要在多線程環(huán)境下使用,要考慮到線程安全問題,可能在多個線程里進(jìn)行注冊任務(wù),同時還有一個專門的線程來取任務(wù)執(zhí)行,此處的隊(duì)列就需要注意線程安全問題
所以我們得使用 PriorityBlockingQueue
,既帶有優(yōu)先級又帶有阻塞隊(duì)列
private PriorityBlockingQueue<> queue = new PriorityBlockingQueue<>();
// 自己寫個簡單的定時器
class MyTimer {
// 掃描線程
private Thread t = null;
// 定時器內(nèi)部要能夠存放多個任務(wù) 阻塞優(yōu)先級隊(duì)列保存任務(wù)
private PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();
public MyTimer() {
// TODO
}
/** 定時器提供一個 schedule 方法,注冊任務(wù)
* @param runnable 要執(zhí)行的任務(wù)
* @param after 多長時間(毫秒)之后執(zhí)行
*/
public void schedule(Runnable runnable, long after) {
MyTask task = new MyTask(runnable, System.currentTimeMillis() + after);
queue.put(task); // 任務(wù)放入堆
}
}
——執(zhí)行時間到了的任務(wù):
需要先執(zhí)行時間最考前的任務(wù)
就需要有一個掃描線程,不停地去檢查當(dāng)前優(yōu)先隊(duì)列的隊(duì)首元素,看看當(dāng)前最靠前的這個任務(wù)是不是時間到了
在定時器構(gòu)造方法中 創(chuàng)建線程進(jìn)行掃描
阻塞隊(duì)列,只能先把元素出隊(duì)列才好判定,不滿足還得放回去
這不像普通隊(duì)列,可以直接取隊(duì)首元素判定的
public MyTimer() {
t = new Thread(() -> {
while (true) {
try {
// 取出隊(duì)首元素,再比較這個任務(wù)有沒有到時間
MyTask myTask = queue.take();
long curTime = System.currentTimeMillis();
if (curTime < (myTask).getTime()) { // 1.沒到時間,任務(wù)放回堆
queue.put(myTask);
} else { // 2.時間到了,執(zhí)行任務(wù)
myTask.run();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
}
3.3、兩個缺陷
上述代碼中存在兩個非常嚴(yán)重的問題:
第—個缺陷: MyTask 沒有指定比較規(guī)則
像剛才咱們實(shí)現(xiàn)的 MyTask 這個類的比較規(guī)則,并不是默認(rèn)就存在的,這個需要咱們手動指定,按照時間大小來比較的
標(biāo)準(zhǔn)庫中的集合類,很多都是有一定的約束限制的,不是隨便拿個類都能放到這些集合類里面去的
——測試:
public class ThreadDemo {
public static void main(String[] args) {
MyTimer myTimer = new MyTimer();
myTimer.schedule(new Runnable() {
@Override
public void run() {
System.out.println("任務(wù)1");
}
}, 1000);
myTimer.schedule(new Runnable() {
@Override
public void run() {
System.out.println("任務(wù)2");
}
}, 2000);
}
}
讓 MyTask 類實(shí)現(xiàn) Comparable接口,另外也可以使用 Comparator單獨(dú)寫個比較器
修改:
class MyTask implements Comparable<MyTask> {
@Override
public int compareTo(MyTask o) {
return (int) (this.time - o.time);
}
}
第二個缺陷: 如果不加任何限制,這個循環(huán)就會執(zhí)行的非???/font>
while (true) 轉(zhuǎn)的太快了, 造成了無意義的 CPU 浪費(fèi)
如果隊(duì)列中的任務(wù)是空著的,就還好,這個線程就再這里阻塞了 (沒問題)
就怕隊(duì)列中的任務(wù)不空,并且任務(wù)時間還沒到
上述操作,稱為 “忙等”,等確實(shí)是等了,但是又沒閑著。既沒有實(shí)質(zhì)性的工作產(chǎn)出,同時又沒有進(jìn)行休息
等待是要釋放 CPU 資源的。讓 CPU 干別的事情。但是忙等。既進(jìn)行了等待。又占用著CPU資源,忙等這種操作是非常浪費(fèi) CPU 的。
既然是指定一個等待時間,為啥不直接用 sleep
,而是要再用一下 wait 呢
sleep 不能被中途喚醒的,wait 能夠被中途喚醒
在等待過程中,可能要插入新的任務(wù)! 新的任務(wù)是可能出現(xiàn)在之前所有任務(wù)的最前面的,使用 sleep 可能會錯過新任務(wù)的執(zhí)行時間
可以基于 wait
這樣的機(jī)制來實(shí)現(xiàn)
wait 有一個版本,指定等待時間 (不需要 notify,時間到了自然喚醒),計(jì)算出當(dāng)前時間和任務(wù)的目標(biāo)之間的時間差,就等待這么長時間即可
在 schedule
操作中,就需要加上一個 notify 操作。使用 wait 等待,每次有新任務(wù)來了 (有人調(diào)用 schedule),就 notify 一下,重新檢查下時間,重新計(jì)算要等待的時間
這樣掃描線程既可以指定時間等待,也可以隨時喚醒。讓等待不占用 CPU,同時不錯過新任務(wù)
修改:
3.4、問題三:notify 空
代碼寫到這里,還有個很嚴(yán)重的問題,這個問題,還是和線程安全 / 隨機(jī)調(diào)度密切相關(guān)的
考慮一個極端情況:
假設(shè)代碼執(zhí)行到 put 這一行,這個線程就從 cpu 調(diào)度走了…
當(dāng)線程回來之后,接下來就要進(jìn)行 wait 操作,此時 wait 的時間已經(jīng)是算好了的
比如 curTime 是 13:00,任務(wù) getTime 是 14:00 即將要 wait 1小時 (此時還沒執(zhí)行 wait,因?yàn)榫€程在 put 就被調(diào)走了)
此時,另一個線程調(diào)用了 schedule 添加新任務(wù),新任務(wù)是 13:30 執(zhí)行
此處調(diào)用 schedule 會執(zhí)行 notify,通知 wait 喚醒
由于掃描線程 wait 還沒執(zhí)行呢!
所以,此處的 notify 不會產(chǎn)生任何的喚醒操作! 此時此刻,新的任務(wù)雖然已經(jīng)插入了隊(duì)列,新的任務(wù)也是在隊(duì)首緊接著掃描線程回到 cpu了,此時等待時間仍然是 1小時
因此,13:30 新的任務(wù),就被錯過了!
了解了上述問題之后,就不難發(fā)現(xiàn),問題出現(xiàn)的原因,是因?yàn)?mark>當(dāng)前 take 操作,和 wait 操作,并非是原子的
如果在 take 和 wait 之間加上鎖,保證在這個過程中,不會有新的任務(wù)過來,問題自然解決
(換句話說,只要保證每次 notify 時,確實(shí)都正在 wait)
修改:
此處只需要把鎖的范圍放大,放大之后,此時就可以保證執(zhí)行 notify 的時候,wait 是確實(shí)已經(jīng)執(zhí)行完了
就可以預(yù)防出現(xiàn) notify 的時候還沒有準(zhǔn)備好,wait這樣的情況了
代碼:
import java.util.concurrent.PriorityBlockingQueue;
// 創(chuàng)建一個類,表示一個任務(wù)
class MyTask implements Comparable<MyTask> {
// 任務(wù)具體要做什么
private Runnable runnable;
// 任務(wù)什么時候執(zhí)行 (任務(wù)要執(zhí)行的毫秒級時間戳)
private long time;
public MyTask(Runnable runnable, long time) {
this.runnable = runnable;
this.time = time;
}
// 獲取當(dāng)前任務(wù)時間
public long getTime() {
return time;
}
// 執(zhí)行任務(wù)
public void run() {
runnable.run();
}
@Override
public int compareTo(MyTask o) {
return (int) (this.time - o.time);
}
}
// 自己寫個簡單的定時器
class MyTimer {
// 掃描線程
private Thread t = null;
// 定時器內(nèi)部要能夠存放多個任務(wù) 阻塞優(yōu)先級隊(duì)列保存任務(wù)
private PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();
// 掃描線程
public MyTimer() {
t = new Thread(() -> {
while (true) {
try {
synchronized (this) {
// 取出隊(duì)首元素,再比較這個任務(wù)有沒有到時間
MyTask myTask = queue.take();
long curTime = System.currentTimeMillis();
if (curTime < (myTask).getTime()) { // 1.沒到時間,任務(wù)放回堆
queue.put(myTask);
// 在 put 后 wait
this.wait(myTask.getTime() - curTime);
} else { // 2.時間到了,執(zhí)行任務(wù)
myTask.run();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
}
/** 定時器提供一個 schedule 方法,注冊任務(wù)
* @param runnable 要執(zhí)行的任務(wù)
* @param after 多長時間(毫秒)之后執(zhí)行
*/
public void schedule(Runnable runnable, long after) {
// 注意換算,time 是一個時間戳,不是絕對的時間戳的值
MyTask task = new MyTask(runnable, System.currentTimeMillis() + after);
queue.put(task); // 任務(wù)放入堆
// 有新任務(wù)加入 notify
synchronized (this) {
this.notify();
}
}
}
public class ThreadDemo {
public static void main(String[] args) {
MyTimer myTimer = new MyTimer();
myTimer.schedule(new Runnable() {
@Override
public void run() {
System.out.println("任務(wù)1");
}
}, 1000);
myTimer.schedule(new Runnable() {
@Override
public void run() {
System.out.println("任務(wù)2");
}
}, 2000);
}
}
運(yùn)行結(jié)果:
任務(wù)1
任務(wù)2
總結(jié):
- 描述—個任務(wù): runnable + time
- 使用優(yōu)先阻塞隊(duì)列來組織若干個任務(wù),PriorityBlockingQueue
- 實(shí)現(xiàn) schedule 方法來注冊任務(wù)到隊(duì)列中
- 創(chuàng)建一個掃描線程這個掃描線程不停地獲取到隊(duì)首元素,并且判定時間是否到達(dá)
- 注意:讓 MyTask 類能夠支持比較,注意解決這里的忙等問題,notity 時 wait 沒有執(zhí)行問題
4、案例四:線程池
4.1、用戶態(tài) / 內(nèi)核態(tài)
進(jìn)程,比較重,頻繁創(chuàng)建銷毀,開銷大
解決方案:進(jìn)程池 or 線程
線程 (輕量級進(jìn)程),雖然比進(jìn)程輕了,創(chuàng)建線程比創(chuàng)建進(jìn)程更高效;銷毀線程比銷毀進(jìn)程更高效;調(diào)度線程比調(diào)度進(jìn)程更高效…但是如果創(chuàng)建銷毀的頻率進(jìn)一步增加,仍然會發(fā)現(xiàn)開銷還是有的
解決方案:線程池 or 協(xié)程/纖程 (還沒有被加入 Java 標(biāo)準(zhǔn)庫。Go 內(nèi)置了協(xié)程,因此使用 Go 開發(fā)并發(fā)編程程序是有一定優(yōu)勢的)
使用線程池,來降低創(chuàng)建/銷毀線程的開銷
把線程提前創(chuàng)建好,放到池子里
1.后面需要用線程,直接從池子里取,就不必從系統(tǒng)這邊申請了。線程用完了,也不是還給系統(tǒng),而是2.放回池子里,以備下次再用
這兩個動作比創(chuàng)建/銷毀更高效的
——為森么線程放在池子里,就比從系統(tǒng)這邊申請釋放來的更快呢?
程序中的“用戶態(tài)”,
用戶態(tài)執(zhí)行的是程序猿自己寫的代碼,就在最上面的應(yīng)用程序這一層來運(yùn)行的。這里的代碼都稱為 “用戶態(tài)” 運(yùn)行的代碼。
程序中的"內(nèi)核態(tài)",
內(nèi)核會給程序提供一些 API,稱為系統(tǒng)調(diào)用,有些代碼,需要調(diào)用操作系統(tǒng)的 API,進(jìn)一步的邏輯就會在內(nèi)核中執(zhí)行,內(nèi)核態(tài)進(jìn)行的操作都是在操作系統(tǒng)內(nèi)核中完成的。
例如,調(diào)用一個 System.out.println。本質(zhì)上要經(jīng)過 write 系統(tǒng)調(diào)用,進(jìn)入到內(nèi)核中,內(nèi)核執(zhí)行—堆邏輯,控制顯示器輸出字符串…
在內(nèi)核中運(yùn)行的代碼,稱為 “內(nèi)核態(tài)” 運(yùn)行的代碼。
創(chuàng)建/銷毀線程,需要操作系統(tǒng)內(nèi)核完成 (創(chuàng)建線程本質(zhì)是在內(nèi)核中搞個PCB,加到鏈表里)
調(diào)用的 Thread.start 其實(shí)歸根結(jié)底,也是要進(jìn)入內(nèi)核態(tài)來運(yùn)行。
此時你不清楚內(nèi)核身上背負(fù)著多少任務(wù) (內(nèi)核不是只給你一個應(yīng)用程序服務(wù),給所有的程序都要提供服務(wù))
因此,當(dāng)使用系統(tǒng)調(diào)用,執(zhí)行內(nèi)核代碼的時候,無法確定內(nèi)核都要做哪些工作,整體過程 "不可控” 的
而把創(chuàng)建好的線程放到" 池子里",由于池子就是用戶態(tài)實(shí)現(xiàn)的
這個放到池子 / 從池子取,這個過程不需要涉及到內(nèi)核態(tài),就是純粹的用戶態(tài)代碼就能完成
一般認(rèn)為,純用戶態(tài)的操作,效率要比經(jīng)過內(nèi)核態(tài)處理的操作,要效率更高。
例如:滑稽老鐵去銀行處理業(yè)務(wù),柜員說需要省份證復(fù)印件
1、滑稽老鐵,自己來到大廳的復(fù)印機(jī)這里進(jìn)行復(fù)印。純用戶態(tài)的操作。(完全自己完成的,整體的過程可控)
2、滑稽老鐵,把身份證給柜員,讓柜員去幫他復(fù)印,這個過程就相當(dāng)于交給了內(nèi)核態(tài)完成一些工作。(不是自己完成的,整體不可控的)
咱們也不知道柜員身上有多少任務(wù)。可能從柜臺消失之后,是給你復(fù)印去了。
但是他可能還會順手做一些其他的事情。數(shù)一下錢 / 清點(diǎn)一下票據(jù) / 上個廁所 / 回個消息…
認(rèn)為內(nèi)核態(tài)效率低,倒不是說—定就真的低。而是代碼進(jìn)入了內(nèi)核態(tài),就不可控了。
內(nèi)核什么時候給你把活干完,把結(jié)果給你。(有的時候快,有的時候慢)
4.2、標(biāo)準(zhǔn)庫中的線程池 ThreadPoolExecutor
ThreadPoolExecutor
先學(xué)習(xí)—下 Java 標(biāo)準(zhǔn)庫中,線程池的使用,然后再自己實(shí)現(xiàn)一個線程池
標(biāo)準(zhǔn)庫的線程池叫做 ThreadPoolExecutor
這個東西用起來有點(diǎn)麻煩
在 java.util.concurrent (concurrent 并發(fā)) 下,
Java 中很多和多線程相關(guān)的組件都在這個 concurrent 包里
——構(gòu)造方法:
(針對 ThreadPoolExecutor 這里的構(gòu)造方法參數(shù)的解釋,是高頻考點(diǎn),重點(diǎn)掌握!!!)
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Creates a new ThreadPoolExecutor with the given initial parameters.
// 創(chuàng)建一個新 ThreadPoolExecutor 給定的初始參數(shù)
int corePoolSize
核心線程數(shù) (正式員工的數(shù)量)
int maximumPoolSize
最大線程數(shù) (正式員工 + 臨時工)
把一個線程池,想象成是一個"公司",公司里有很多員工在干活
把線程(員工)分成兩類:
1、正式員工(核心線程),正式員工允許摸魚
2、臨時工,臨時工不允許摸魚
開始的時候,假設(shè)公司要完成的工作不多,正式員工完全就能搞定,就不需要臨時工。
如果公司的任務(wù)突然猛增了,正式員工加班也搞不定了,就需要雇傭一批臨時工 (更多的線程)
但是一個程序任務(wù)不一定始終都很多,過了一段時間,工作量又降低了,現(xiàn)在的活正式員工也就能搞定了,甚至還有富裕 (正式員工可以摸魚了) 臨時工就更摸魚了,就需要對現(xiàn)有的線程(臨時工)進(jìn)行一定的淘汰
整體的策略,正式員工保底,臨時工動態(tài)調(diào)節(jié)
long keepAliveTime
允許臨時工摸魚的時間
TimeUnit unit
時間的單位 (s, ms, us…)
BlockingQueue<Runnable> workQueue,
任務(wù)隊(duì)列
線程池會提供一個 submit
方法讓程序猿把任務(wù)注冊到線程池中,加到這個任務(wù)隊(duì)列中
每個工作線程都是再不停嘗試 take 的,如果有任務(wù),take 成功,沒有,就阻塞。
ThreadFactory threadFactory ,
線程工廠類,用于創(chuàng)建線程,線程池是需要創(chuàng)建線程的
RejectedExecutionHandler handler
描述了線程池的 拒絕策略,也是一個特殊的對象,描述了當(dāng)線程池任務(wù)隊(duì)列滿了,如果繼續(xù)添加任務(wù)會有什么樣的行為…
以下是標(biāo)準(zhǔn)庫提供的四個拒絕策略:
- 直接拋異常 RejectedExecutionException
- 多出來的任務(wù),誰加的,誰負(fù)責(zé)執(zhí)行
- 直接丟棄最老的任務(wù)
- 丟棄最新的任務(wù)
比如我現(xiàn)在有很多任務(wù)要完成,突然有人給我來了個新的活,但是我已經(jīng)非常忙,任務(wù)隊(duì)列已經(jīng)滿了,導(dǎo)致我 CPU 燒了,新的活干不了 (1)
我說,我沒空,你自己干吧 (2)
放下手里的工作,去做新的活 (3)
拒絕新的活,還是做原有的工作 (4)
線程池中線程的個數(shù):
雖然線程池的參數(shù)這么多,但是使用的時候最最重要的參數(shù),還是第一組參數(shù),線程池中線程的個數(shù)
——有一個程序,這個程序要 并發(fā)的/多線程的 來完成一些任務(wù),如果使用線程池的話,這里的線程數(shù)設(shè)為多少合適? [不僅僅是面試題,也是工作中需要思考的話題]
針對這個問題,網(wǎng)上的很多說法,是不正確的!
網(wǎng)上一種典型的回答:假設(shè)機(jī)器有 N 核CPU,線程池的線程數(shù)目,就設(shè)為 N(CPU 的核數(shù)),N + 1,1.2N,1.5N, 2N…
只要能回答出一個具體的數(shù)字,都—定是錯的!
不同的程序特點(diǎn)不同,此時要設(shè)置的線程數(shù)也是不同的,
考慮兩個極端情況:
-
CPU 密集型
每個線程要執(zhí)行的任務(wù)都是狂轉(zhuǎn) CPU (進(jìn)行一系列算術(shù)運(yùn)算)
此時線程池線程數(shù),最多也不應(yīng)該超過 CPU 核數(shù)
此時如果你設(shè)置的更大,也沒用
CPU 密集型任務(wù),要一直占用 CPU,搞那么多線程,但是 CPU 的坑不夠了… -
IO 密集型
每個線程干的工作就是等待 IO (讀寫硬盤,讀寫網(wǎng)卡,等待用戶輸入) ——不吃CPU
此時這樣的線程處于阻塞狀態(tài),不參與 CPU 調(diào)度…
這個時候多搞一些線程都無所謂, 不再受制于 CPU 核數(shù)了
理論上來說你線程數(shù)設(shè)置成無窮大都可以 (實(shí)際上當(dāng)然是不行的)
然而,我們實(shí)際開發(fā)中并沒有程序符合這兩種理想模型… 真實(shí)的程序,往往一部分要吃 CPU,一部分要等待 IO
具體這個程序幾成工作量是吃 CPU 的,幾成工作量是等待 IO,不確定…
實(shí)踐中確定線程數(shù)量:通過性能測試的方式,找到合適的值
例如,寫一個服務(wù)器程序,服務(wù)器里通過線程池,多線程的處理用戶請求,就可以對這個服務(wù)器進(jìn)行性能測試,
比如構(gòu)造一些請求,發(fā)送給服務(wù)器,要測試性能,這里的請求就需要構(gòu)造很多,比如每秒發(fā)送 500 / 1000 / 2000…根據(jù)實(shí)際的業(yè)務(wù)場景,構(gòu)造一個合適的值
根據(jù)這里不同的線程池的線程數(shù),來觀察,程序處理任務(wù)的速度,程序持有的 CPU 的占用率,
當(dāng)線程數(shù)多了,整體的速度是會變快,但是 CPU 占用率也會高
當(dāng)線程數(shù)少了,整體的速度是會變慢,但是 CPU 占用率也會下降
需要找到一個讓程序速度能接受,并且CPU占用也合理這樣的平衡點(diǎn)
不同類型的程序,因?yàn)閱蝹€任務(wù),里面 CPU 上計(jì)算的時間和阻塞的時間是分布不相同的
因此隨意想出來一個數(shù)字往往是不靠譜
搞了多線程,就是為了讓程序跑的更快嘛,為啥要考慮不讓CPU占用率太高呢?
對于線上服務(wù)器來說,要留有一定的冗余!隨時應(yīng)對一些可能的突發(fā)情況!(例如請求突然暴漲)
如果本身已經(jīng)把 CPU 快占完了,這時候突然來—波請求的峰值,此時服務(wù)器可能直接就掛了
Executors
ThreadPoolExecutor 這個線程池用起來更麻煩一點(diǎn)(提供的功能更強(qiáng)大),所以才提供了工廠類,讓我們用著更簡單
標(biāo)準(zhǔn)庫中提供了一個簡化版本的線程池 Executors
本質(zhì)是針對 ThreadPoolExecutor
進(jìn)行了封裝,提供了一些默認(rèn)參數(shù)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class demo6 {
public static void main(String[] args) {
// 創(chuàng)建一個固定的線程數(shù)目的線程池,參數(shù)指定了線程的個數(shù)
ExecutorService pool = Executors.newFixedThreadPool(10);
// 創(chuàng)建一個自動擴(kuò)擴(kuò)容的線程池,線程數(shù)量動態(tài)變化,會根據(jù)任務(wù)量自動擴(kuò)容
Executors.newCachedThreadPool();
// 創(chuàng)建一個只有一個線程的線程池
Executors.newSingleThreadExecutor();
// 創(chuàng)建一個帶有定時器功能的線程池,類似于 Timer,只不過執(zhí)行的時候不是由掃描線程自己執(zhí)行,而是由單獨(dú)的線程池來執(zhí)行
Executors.newScheduledThreadPool(10);
}
}
——使用 Executors:
構(gòu)造出一個 10 個線程的線程池
線程池提供了一個重要的方法 submit 可以給線程池提交若干個任務(wù)
把 Runnable 描述的任務(wù)提交到線程池里,此時 run 方法不是主線程調(diào)用,是由線程池中的 10 個線程中的一個調(diào)用
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class demo6 {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello threadPool!");
}
});
}
}
運(yùn)行結(jié)果:
hello threadPool!
運(yùn)行程序之后發(fā)現(xiàn),main 線程結(jié)束了,但是整個進(jìn)程沒結(jié)束,線程池中的線程都是前臺線程,此時會阻止進(jìn)程結(jié)束 (前面定時器 Timer 也是同理)
——循環(huán)提交 1000 個任務(wù):
public class ThreadDemo2 {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
int n = i;
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello pool! " + n);
}
});
}
}
}
此處要注意,當(dāng)前是往線程池里放了 1000 個任務(wù)
1000 個任務(wù)就是由這 10 個線程來平均分配一下,差不多是一人執(zhí)行 100 個,但是注意這里并非是嚴(yán)格的平均,可能有的多一個有的少一個,都正常 (隨機(jī)調(diào)度)
(每個線程都執(zhí)行完一個任務(wù)之后,再立即取下一個任務(wù)… 由于每個任務(wù)執(zhí)行時間都差不多,因此每個線程做的任務(wù)數(shù)量就差不多)
進(jìn)一步的可以認(rèn)為,這 1000 個任務(wù),就在一個隊(duì)列中排隊(duì)呢
這 10 個線程,就依次來取隊(duì)列中的任務(wù),取一個就執(zhí)行一個,執(zhí)行完了之后再執(zhí)行下一個
工廠模式
ExecutorService pool = Executors.newFixedThreadPool(10);
此處 new 是方法名字的一部分,不是 new 關(guān)鍵字
這個操作,使用某個類的某個靜態(tài)方法,直接構(gòu)造出一個對象來 (相當(dāng)于是把 new 操作,給隱藏到這樣的方法后面了)
像這樣的方法,就稱為“工廠方法”
提供這個工廠方法的類,也就稱為"工廠類",此處這個代碼就使用了“工廠模式",這種設(shè)計(jì)模式
工廠模式:—句話表示,使用普通的方法,來代替構(gòu)造方法,創(chuàng)建對象
為啥要代替?構(gòu)造方法有坑!!!
坑就體現(xiàn)在,只構(gòu)造一種對象,好辦
如果要構(gòu)造多種不同情況的對象,就難搞了…
——舉個栗子:
有個類,用多種方法構(gòu)造平面上的一個點(diǎn)
class Point {
// 使用笛卡爾坐標(biāo)系提供的坐標(biāo),來構(gòu)造點(diǎn)
public Point(double x, double y) {}
// 使用極坐標(biāo),來構(gòu)造點(diǎn)
public Point(double r, double a) {}
}
很明顯,這個代碼有問題!!! 正常來說,多個構(gòu)造方法
是通過"重載”的方式來提供的
重載要求的是,方法名相同,參數(shù)的個數(shù)或者類型不相同
而上述兩個方法,方法名相同,參數(shù)個數(shù)相同,參數(shù)類型相同,無法構(gòu)成重載,在 Java 上無法正確編譯
為了解決這個問題,就可以使用工廠模式:
class PointFactory {
public static Point makePointByXY(double x, double y) {}
public static Point makePointByRA(double r, double a) {}
}
Point p = PointFactory.makePointByXY(10,20);
普通方法,方法名字沒有限制的
因此有多種方式構(gòu)造,就可以直接使用不同的方法名即可,此時,方法的參數(shù)是否要區(qū)分,已經(jīng)不重要了
很多時候,設(shè)計(jì)模式,是在規(guī)避編程語言語法上的坑
不同的語言,語法規(guī)則不一樣,因此在不同的語言上,能夠使用的設(shè)計(jì)模式,可能會不同,有的設(shè)計(jì)模式,已經(jīng)被融合在語言的語法內(nèi)部了…
咱們?nèi)粘U劦降脑O(shè)計(jì)模式,主要是基于 C++/Java/C# 這樣語言來展開的,這里所說的設(shè)計(jì)模式不一定適合其他語言
像工廠模式,對于 Python 來說沒什么價值,Python 構(gòu)造方法,不像C++/Java 的這么坑,可以直接在構(gòu)造方法中通過其他手段來做出不同版本的區(qū)分
——不能直接使用 i 的原因:
Lambda 變量捕獲
很明顯,此處的 run 方法屬于 Runnable,這個方法的執(zhí)行時機(jī),不是立刻馬上
而是在未來的某個節(jié)點(diǎn) (后續(xù)在線程池的隊(duì)列中,排到他了,就讓對應(yīng)的線程去執(zhí)行)
fori 循環(huán)中的 i,這是主線程里的局部變量 (在主線程的棧上),隨著主線程這里的代碼塊執(zhí)行結(jié)束就銷毀了
很可能主線程這里 for 執(zhí)行完了,當(dāng)前 run 的任務(wù)在線程池里還沒排到呢,此時 i 就已經(jīng)要銷毀了
為了避免作用域的差異,導(dǎo)致后續(xù)執(zhí)行 run 的時候 i 已經(jīng)銷毀,
于是就有了變量捕獲,也就是讓 run 方法把剛才主線程的 i 給往當(dāng)前 run 的棧上拷貝一份…
(在定義 run 的時候,偷偷把 i 當(dāng)前的值記住
后續(xù)執(zhí)行 run 的時候,就創(chuàng)建一個也叫做 i 的局部變量,并且把這個值賦值過去…)
在 Java 中,對于變量捕獲,做了一些額外的要求
在 JDK 1.8 之前,要求變量捕獲,只能捕獲 final 修飾的變量,后來發(fā)現(xiàn),這么搞太麻煩了
在 1.8 開始,放松了一點(diǎn)標(biāo)準(zhǔn),要求不一定非得帶 final 關(guān)鍵字,只要代碼中沒有修改這個變量,也可以捕獲
此處,i 是有修改的,不能捕獲的
而n是沒有修改的,雖然沒有 final 修飾,但是也能捕獲了
C++, JS 也有類似的變量捕獲的語法,但是沒有上述限制…文章來源:http://www.zghlxwxcb.cn/news/detail-607437.html
4.3、實(shí)現(xiàn)一個線程池
線程池里面有:文章來源地址http://www.zghlxwxcb.cn/news/detail-607437.html
- 先能夠描述任務(wù) (直接使用 Runnable)
- 需要組織任務(wù) (直接使用 BlockingQueue)
- 能夠描述工作線程
- 還需要組織這些線程
- 需要實(shí)現(xiàn),往線程池里添加任務(wù)
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
// 實(shí)現(xiàn)一個固定線程數(shù)的線程池
class MyThreadPool {
// 1、描述一個任務(wù),不像定時器涉及"時間",直接用 Runnable,不需要額外類
// 2、使用一個數(shù)據(jù)結(jié)構(gòu)(阻塞隊(duì)列)來組織若干個任務(wù)
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
// 在構(gòu)造方法中,創(chuàng)建若干個線程 (n 表示線程的數(shù)量)
public MyThreadPool(int n) {
// 在這里創(chuàng)建線程
for (int i = 0; i < n; i++) {
Thread t = new Thread(() -> {
while (true) { // 從隊(duì)列中循環(huán)地取任務(wù)
try {
// 循環(huán)地獲取任務(wù)隊(duì)列中的任務(wù),然后執(zhí)行
// 隊(duì)列為空,直接阻塞。隊(duì)列非空,就獲取內(nèi)容
Runnable runnable = queue.take(); // 獲取任務(wù)
runnable.run(); // 執(zhí)行任務(wù)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
}
}
// 創(chuàng)建一個方法,能夠允許程序員放任務(wù)到線程池中
// 注冊任務(wù)給線程池,由這 10 個線程執(zhí)行
public void submit(Runnable runnable) {
try {
queue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class TestDemo {
public static void main(String[] args) {
MyThreadPool pool = new MyThreadPool(10);
for (int i = 0; i < 1000; i++) {
int n = i;
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello " + n);
}
});
}
}
}
到了這里,關(guān)于多線程案例 | 單例模式、阻塞隊(duì)列、定時器、線程池的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!