多線程案例
阻塞隊(duì)列
概念
阻塞隊(duì)列是一種特殊的隊(duì)列. 也遵守 “先進(jìn)先出” 的原則.
阻塞隊(duì)列能是一種線程安全的數(shù)據(jù)結(jié)構(gòu), 并且具有以下特性:
- 當(dāng)隊(duì)列滿的時(shí)候, 繼續(xù)入隊(duì)列就會(huì)阻塞, 直到有其他線程從隊(duì)列中取走元素.
- 當(dāng)隊(duì)列空的時(shí)候, 繼續(xù)出隊(duì)列也會(huì)阻塞,直到有其他線程往隊(duì)列中插入元素.
阻塞隊(duì)列的一個(gè)典型應(yīng)用場(chǎng)景就是 “生產(chǎn)者消費(fèi)者模型”. 這是一種非常典型的開發(fā)模型.
生產(chǎn)者消費(fèi)者模型
生產(chǎn)者消費(fèi)者模式就是通過(guò)一個(gè)容器來(lái)解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問(wèn)題。
生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過(guò)阻塞隊(duì)列來(lái)進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等。待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取.
- 阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力
- 阻塞隊(duì)列也能使生產(chǎn)者和消費(fèi)者之間 解耦.
消息隊(duì)列:特殊的隊(duì)列,相當(dāng)于在阻塞隊(duì)列的基礎(chǔ)上,加了一個(gè)“消息的類型”。按照指定類別進(jìn)行先進(jìn)先出。
在以上場(chǎng)景中:此時(shí)A把請(qǐng)求轉(zhuǎn)發(fā)給B,B處理完之后把結(jié)果反饋給A。此時(shí)就可以視為是A調(diào)用了B。此時(shí)A和B之間的耦合是比較高的。如果B出現(xiàn)問(wèn)題了,那么A也有可能會(huì)出現(xiàn)問(wèn)題。若此時(shí)再加一個(gè)服務(wù)器C,就要重新修改A的代碼,在此過(guò)程中,很容易出現(xiàn)bug。
針對(duì)以上場(chǎng)景,使用生產(chǎn)者消費(fèi)者模型就可以有效的降低耦合。
此時(shí)A和B之間的耦合就大大降低了。
A不知道B,A只知道隊(duì)列。同理B不知道A,B也只知道隊(duì)列。AB任何一個(gè)出bug了對(duì)另一個(gè)的影響是非常小的。
我們來(lái)寫一個(gè)生產(chǎn)者消費(fèi)者模型:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ThreadDemo22 {
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
//創(chuàng)建兩個(gè)線程,來(lái)作為生產(chǎn)者和消費(fèi)者
Thread customer = new Thread(()->{
while(true){
try {
Integer result = blockingQueue.take();
System.out.println("消費(fèi)元素:"+result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
customer.start();
Thread producer = new Thread(()->{
int count = 0;
while (true){
try {
blockingQueue.put(count);
System.out.println("生產(chǎn)元素:"+ count);
count++;
//控制500ms生產(chǎn)一個(gè)元素
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
producer.start();
}
}
標(biāo)準(zhǔn)庫(kù)中的阻塞隊(duì)列
public interface BlockingQueue<E> extends Queue<E> {...};
BlockingQueue 是一個(gè)接口. 真正實(shí)現(xiàn)的類是 LinkedBlockingQueue.
put 方法用于阻塞式的入隊(duì)列, take 用于阻塞式的出隊(duì)列.
BlockingQueue 也有 offer, poll, peek 等方法, 但是這些方法不帶有阻塞特性
自己實(shí)現(xiàn)一個(gè)阻塞隊(duì)列
import java.util.concurrent.BlockingQueue;
//自己實(shí)現(xiàn)一個(gè)阻塞隊(duì)列:
class MyBlockingQueue{
private int[] items = new int[1000];
private int head = 0;
private int tail = 0;
private int size = 0;
//入隊(duì)列
public void put(int value) throws InterruptedException {
synchronized (this) {
while (size == items.length) {
//隊(duì)列滿了,此時(shí)要產(chǎn)生阻塞
this.wait();
}
items[tail] = value;
tail++;
if (tail >= items.length) {
tail = 0;
}
size++;
//喚醒 take 中的wait
this.notify();
}
}
//出隊(duì)列
public Integer take() throws InterruptedException {
int result = 0;
synchronized (this){
while(size == 0){
this.wait();
}
result = items[head];
head++;
if(head >= items.length){
head = 0;
}
size--;
//喚醒 put中的wait
this.notify();
}
return result;
}
}
public class ThreadDemo23 {
public static void main(String[] args) throws InterruptedException {
/*MyBlockingQueue queue = new MyBlockingQueue();
queue.put(1);
queue.put(2);
queue.put(3);
queue.put(4);
int result = 0;
result = queue.take();
System.out.println("result = " + result);
result = queue.take();
System.out.println("result = " + result);
result = queue.take();
System.out.println("result = " + result);
result = queue.take();
System.out.println("result = " + result);*/
MyBlockingQueue queue = new MyBlockingQueue();
Thread customer = new Thread(()->{
while(true){
try {
Integer result = queue.take();
System.out.println("消費(fèi)元素:"+result);
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
customer.start();
Thread produce = new Thread(()->{
int count = 0;
while(true){
System.out.println("生產(chǎn)元素:"+count);
try {
queue.put(count);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
count++;
}
});
produce.start();
}
}
定時(shí)器
概念
定時(shí)器:指定特定時(shí)間段之后執(zhí)行一個(gè)事先準(zhǔn)備好的方法/代碼。
定時(shí)器第軟件開發(fā)中的一個(gè)重要組件。尤其是在網(wǎng)絡(luò)編程的時(shí)候。類似于一個(gè)“鬧鐘”。達(dá)到一個(gè)設(shè)定的時(shí)間之后, 就執(zhí)行某個(gè)指定好的代碼.
標(biāo)準(zhǔn)庫(kù)中的定時(shí)器
標(biāo)準(zhǔn)庫(kù)中提供了一個(gè) Timer 類. Timer 類的核心方法為 schedule .
schedule 包含兩個(gè)參數(shù). 第一個(gè)參數(shù)指定即將要執(zhí)行的任務(wù)代碼, 第二個(gè)參數(shù)指定多長(zhǎng)時(shí)間之后 執(zhí)行 (單位為毫秒).
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("運(yùn)行定時(shí)器任務(wù)1!");
}
},3000);
timer.schedule();
這個(gè)方法的效果是給定時(shí)器注冊(cè)一個(gè)任務(wù),任務(wù)不會(huì)立即執(zhí)行,而是在指定時(shí)間進(jìn)行執(zhí)行。
實(shí)現(xiàn)定時(shí)器
- 讓被注冊(cè)的任務(wù),能夠在指定時(shí)間內(nèi)被執(zhí)行
做法:?jiǎn)为?dú)在定時(shí)器內(nèi)部搞一個(gè)線程,讓這個(gè)線程周期性的掃描**(掃描線程)**,判定任務(wù)是否時(shí)間到。 - 一個(gè)定時(shí)器是可以注冊(cè)N個(gè)任務(wù)的**(schedule線程)**,N個(gè)任務(wù)會(huì)按照最初約定的時(shí)間,按順序執(zhí)行。
這N個(gè)任務(wù),就需要使用帶有優(yōu)先級(jí)的阻塞隊(duì)列來(lái)保存。(帶有優(yōu)先級(jí)是因?yàn)榭梢园凑諘r(shí)間小的作為優(yōu)先級(jí)高的,此時(shí)隊(duì)首元素就是整個(gè)隊(duì)列中,最先要執(zhí)行的任務(wù)。此時(shí),上述掃描線程只需要查看一下隊(duì)首元素即可,不需要遍歷整個(gè)隊(duì)列。(效率大大提高))
在這里插入代碼片import java.util.concurrent.PriorityBlockingQueue;
//使用這個(gè)類表示一個(gè)定時(shí)器中的任務(wù)
class MyTask implements Comparable<MyTask>{
//要執(zhí)行的任務(wù)內(nèi)容
private Runnable runnable;
//任務(wù)在何時(shí)執(zhí)行
private long time;
public MyTask(Runnable runnable,long time){
this.runnable = runnable;
this.time = time;
}
//獲取當(dāng)前任務(wù)的時(shí)間
public long getTime() {
return time;
}
//執(zhí)行任務(wù)
public void run(){
runnable.run();
}
@Override
public int compareTo(MyTask o) {
//重寫方法,按照時(shí)間排序。隊(duì)首元素是時(shí)間最小的任務(wù)。
return (int)(this.time-o.time);
}
}
//定時(shí)器
class MyTimer{
//掃描線程
private Thread t = null;
//使用一個(gè)優(yōu)先級(jí)隊(duì)列。來(lái)保存任務(wù)
private PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue<>();
//使用這個(gè)對(duì)象來(lái)進(jìn)行加鎖/等待通知
private Object locker = new Object();
public MyTimer(){
t = new Thread(){
public void run() {
while (true) {
try {
//取出隊(duì)首元素,檢查看看隊(duì)首元素任務(wù)是否時(shí)間到了
//如果時(shí)間沒到,就把任務(wù)塞回隊(duì)列中
//如果時(shí)間到了,就把任務(wù)進(jìn)行執(zhí)行
synchronized (locker) {
MyTask myTask = queue.take();
long curTime = System.currentTimeMillis();
if (curTime < myTask.getTime()) {
//時(shí)間沒到,塞回隊(duì)列
queue.put(myTask);
//在put之后,進(jìn)行一個(gè)wait
locker.wait(myTask.getTime() - curTime);
} else {
//時(shí)間到了,執(zhí)行任務(wù)
myTask.run();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
t.start();
}
public void schedule(Runnable runnable,long after){
MyTask task = new MyTask(runnable,System.currentTimeMillis()+after);
queue.put(task);
synchronized (locker) {
locker.notify();
}
}
}
public class ThreadDemo25 {
public static void main(String[] args) {
MyTimer myTimer = new MyTimer();
myTimer.schedule(new Runnable(){
public void run(){
System.out.println("任務(wù)1");
}
},2000);
myTimer.schedule(new Runnable(){
public void run(){
System.out.println("任務(wù)2");
}
},1000);
}
}
運(yùn)行結(jié)果:
線程池
線程池存在的意義:使用進(jìn)程來(lái)實(shí)現(xiàn)并發(fā)編程。太重了。此時(shí)引入了線程,線程也是叫做“輕量級(jí)進(jìn)程”。創(chuàng)建線程比創(chuàng)建進(jìn)程更加高效;銷毀線程比銷毀進(jìn)程更高效;調(diào)度線程比調(diào)度進(jìn)程更高效。此時(shí),使用多線程就可以在很多時(shí)候代替進(jìn)程來(lái)實(shí)現(xiàn)并發(fā)編程了。但是隨著并發(fā)程度的提高,隨著我們對(duì)于性能要求標(biāo)準(zhǔn)的提高。線程變得也沒有那么輕量。
當(dāng)我們想要進(jìn)一步提高效率,有兩種方式:
- 輕量級(jí)線程——協(xié)程/纖程
- 使用線程池,來(lái)降低創(chuàng)建/銷毀線程的開銷。
即事先把需要使用的線程創(chuàng)建好,放到池中。后面需要使用的時(shí)候,直接從池中獲取。用完了,就放在池中。
以上操作,比創(chuàng)建/銷毀更高效。
創(chuàng)建/銷毀線程,是由操作系統(tǒng)內(nèi)核完成的。從池子中獲取還給池,是我們自己代碼就能實(shí)現(xiàn)的,不必交給內(nèi)核操作。
標(biāo)準(zhǔn)庫(kù)中的線程池
ExecutorService pool = Executors.newFixedThreadPool(10);
上述操作,使用某個(gè)類的某個(gè)靜態(tài)方法,直接構(gòu)造出一個(gè)對(duì)象。(隱藏new)
這樣的方法,就稱為“工廠方法”。
提供這個(gè)工廠方法的類,就叫做“工廠類”此處這個(gè)代碼就使用了工廠模式這種設(shè)計(jì)模式。
工廠模式
工廠模式:使用普通的方法,來(lái)代替構(gòu)造方法,創(chuàng)建對(duì)象。
例如:
現(xiàn)在有一個(gè)類:
class Point{
public Point(double X,double Y){};
public Point(double R,double A){};
}
此時(shí)代碼出錯(cuò),因?yàn)闃?gòu)造方法只能有一個(gè)。但是我們想要實(shí)現(xiàn)兩種表示做表的方式。那么這種情況下我們就可以使用工廠模式。
class PointFactory{
public static Point makePointXY(double X,double Y){};
public static Point makePointRA(double R,double A){};
}
此時(shí)就解決了上述問(wèn)題。
線程池中提供的方法:submit;
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
int n = i;
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello"+n);
}
});
}
}
我們需要注意的是上述代碼中的變量捕獲問(wèn)題。 也就是為什么在打印的時(shí)候要單獨(dú)創(chuàng)建出一個(gè)變量n,不用i?
i是主線程中的局部變量(在主線程的棧上)。隨著主線程這里代碼塊執(zhí)行結(jié)束就銷毀了。為了避免作用域的差異,導(dǎo)致后續(xù)執(zhí)行run的時(shí)候i已經(jīng)銷毀了。于是就有了變量捕獲,也就是讓run方法將剛才主線程的i往run的棧上拷貝一份。變量捕獲只能捕獲final修飾的變量(JDK1.8之前),JDK1.8之后放松標(biāo)準(zhǔn),只要代碼中沒有修改這個(gè)變量,也可以捕獲。
上述代碼中,i是有修改的,是不能捕獲的。而n是沒有修改的,雖然沒有final修飾,但是也能捕獲。
ThreadPoolExecutor();構(gòu)造方法參數(shù)詳解(重點(diǎn))
為了便于理解這里參數(shù)之間的關(guān)系, 我們使用生活中的例子來(lái)類比理解, 假設(shè)這里有一家公司:
- corePoolSize表示核心線程數(shù), 公司的正式員工.
那核心線程數(shù)最合適值是多少呢? 假設(shè)CPU有N核心, 最適核心線程數(shù)是N? 是2N? 是1.5N? 只要你能夠說(shuō)出一個(gè)具體的數(shù), 那就錯(cuò)了,
最適的核心線程數(shù)要視情況和業(yè)務(wù)場(chǎng)景而定, 沒有一個(gè)絕對(duì)的標(biāo)準(zhǔn)的值.
- maximumPoolSize表示最大線程數(shù),就是核心線程數(shù)與非核心線程數(shù)之和, 公司的正式員工和請(qǐng)來(lái)的零時(shí)工(非核心線程),
現(xiàn)有的工作正式工干不完時(shí), 就會(huì)招來(lái)零時(shí)工幫忙干活. - keepAliveTime非核心線程最長(zhǎng)等待新任務(wù)的時(shí)間, 超過(guò)此時(shí)間, 該線程就會(huì)被銷毀; 就是相當(dāng)于零時(shí)工最長(zhǎng)摸魚時(shí)間,
公司里面是不養(yǎng)閑人的, 零時(shí)工長(zhǎng)時(shí)間沒有工作干就會(huì)被辭退了, 整體的策略, 正式員工保底, 臨時(shí)工動(dòng)態(tài)調(diào)節(jié). - unit上面參數(shù)的時(shí)間單位.
- workQueue線程池的任務(wù)隊(duì)列(阻塞隊(duì)列), 通過(guò)submit方法將任務(wù)注冊(cè)到該隊(duì)列中.
- threadFactory線程工廠, 線程創(chuàng)建的方案.
- handler拒絕策略, 描述了當(dāng)線程池任務(wù)隊(duì)列滿了, 如果繼續(xù)添加任務(wù)會(huì)以什么樣的方式處理.
在Java標(biāo)準(zhǔn)庫(kù)中提供了4個(gè)拒絕策略, 如下:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-426322.html
Modifier and Type | Class and Description |
---|---|
static class | ThreadPoolExecutor.AbortPolicy 如果任務(wù)太多, 隊(duì)列滿了, 直接拋出異常RejectedExecutionException . |
static class | ThreadPoolExecutor.CallerRunsPolicy 如果任務(wù)太多, 隊(duì)列滿了, 多出來(lái)的任務(wù), 誰(shuí)加的, 誰(shuí)負(fù)責(zé)執(zhí)行. |
static class | ThreadPoolExecutor.DiscardOldestPolicy 如果任務(wù)太多, 隊(duì)列滿了, 丟棄最舊的未處理的任務(wù). |
static class | ThreadPoolExecutor.DiscardPolicy 如果任務(wù)太多, 隊(duì)列滿了, 丟棄多出來(lái)的任務(wù). |
實(shí)現(xiàn)線程池
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class MyThreadPool{
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
//n表示線程的數(shù)量
public MyThreadPool(int n){
//創(chuàng)建線程
for (int i = 0; i < n; i++) {
Thread t = new Thread(()->{
while(true){
try {
Runnable runnable = queue.take();
runnable.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t.start();
}
}
//
public void submit(Runnable runnable) {
try {
queue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadDemo27 {
public static void main(String[] args) {
MyThreadPool pool = new MyThreadPool(10);
for (int i = 0; i < 1000; i++) {
int n = i;
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println("hello " + n);
}
});
}
}
}
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-426322.html
到了這里,關(guān)于【JavaEE初階】多線程(四)阻塞隊(duì)列 定時(shí)器 線程池的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!