3.1 新增定時任務(wù)池
11.定時任務(wù)&定時線程池詳解
? 當(dāng)我們不用任務(wù)框架時,我們想自己寫一個定時任務(wù)時,我們能想起那個工具類呢?Timer ?還有嗎?不知道了,下面我們要講下ScheduledThreadPoolExecutor,定時任務(wù)線程池,可以執(zhí)行一次任務(wù),還可以執(zhí)行周期性任務(wù)。
1.0 ScheduledThreadPoolExecutor的用法
定時線程池的類的結(jié)構(gòu)圖如下:
從結(jié)構(gòu)圖上可以看出定時線程池ScheduledThreadPoolExecutor繼承了線程池ThreadPoolExecutor,也就是說它們之間肯定有相同的行為和屬性。
ScheduledThreadPoolExecutor常用發(fā)的方法如下:
1)schedule():一次行任務(wù),延遲執(zhí)行,任務(wù)只執(zhí)行一次。
2)scheduleAtFixedRate():周期性任務(wù),不不等待任務(wù)結(jié)束,每隔周期時間執(zhí)行一次,新任務(wù)放進(jìn)隊列中.
3)scheduleWithFixedDelay():周期性任務(wù),等待任務(wù)結(jié)束,每隔周期時間執(zhí)行一次.
代碼樣例入下:
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestScheduledThreadPoolExecutor {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
//無返回值 延遲5秒返回
scheduledThreadPoolExecutor.schedule(()->{
System.out.println("我要延遲5秒執(zhí)行,只執(zhí)行一次 ");
},5000, TimeUnit.MICROSECONDS);
}
}
可以用在啟動項目時需要等待對象的加載,延遲執(zhí)行一個任務(wù)。
帶返回值的延遲執(zhí)行任務(wù)如下:
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestScheduledThreadPoolExecutor {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
//有返回值任務(wù) 可以用作異步處理任務(wù)不用等待結(jié)果
ScheduledFuture<Integer> future = scheduledThreadPoolExecutor.schedule(()->{
System.out.println("我要延遲5秒執(zhí)行,只執(zhí)行一次 ");
return 1;
},5000, TimeUnit.MICROSECONDS);
System.out.println(future.get());
}
}
待返回值的任務(wù),可以用于異步處理一個任務(wù),等主線任務(wù)執(zhí)行完,主要任務(wù)要知道異步任務(wù)的執(zhí)行狀態(tài)。
周期性任務(wù):參數(shù)一樣,方法名字不一樣 例子如下
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestScheduledThreadPoolExecutor {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
//周期性的任務(wù) 發(fā)心跳 service1-service2 每次5s,發(fā)送一個心跳 下面的例子是不管任務(wù)是否執(zhí)行完,一直想隊列中放。 一個任務(wù)占一個線程。
//scheduledThreadPoolExecutor.scheduleAtFixedRate(()->{
//等待任務(wù)執(zhí)行結(jié)束,在間隔2秒執(zhí)行。
scheduledThreadPoolExecutor.scheduleWithFixedDelay(()->{
System.out.println("send heart beat");
long startTime = System.currentTimeMillis(),nowTime = startTime;
while((nowTime-startTime)<5000){
nowTime = System.currentTimeMillis();
try{
Thread.sleep(100);
}catch (InterruptedException e ){
e.printStackTrace();
}
}
System.out.println("task over .....");
//任務(wù)啟動多久之后 ,周期 每2s執(zhí)行一次,時間單位
},1000,2000,TimeUnit.MILLISECONDS);
}
}
2.0 定時線程池使用場景
2.1 分布式鎖-redis
? 當(dāng)使用setnx獲取分布式鎖(鎖是有失效時間的),但是害怕任務(wù)沒有執(zhí)行完成鎖失效了,怎么辦呢?可以在任務(wù)的開始用一個定時線程池每隔一段時間看下鎖是否失效如果沒失效延長失效時間,如果失效不做處理。這樣可以保證任務(wù)執(zhí)行完成。
2.2 服務(wù)注冊中心
服務(wù)注冊客戶端每隔多久向服務(wù)中心發(fā)送下自己的ip,端口,服務(wù)名字及服務(wù)狀態(tài)。
2.3 和Timer的不同
import java.util.Timer;
import java.util.TimerTask;
public class TestTimer {
public static void main(String[] args) throws InterruptedException {
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
System.out.println("send star -----");
throw new RuntimeException("2134 243");
}
}, 1000, 2000);
}
}
上面是的使用方法,從使用方法上和定時線程池的使用方法類似,都是周期性的執(zhí)行任務(wù);
不同的地方是:
Timer:單線程,線程掛了,不會再創(chuàng)建線程執(zhí)行任務(wù);
ScheduledThreadPoolExecutor:線程掛了,再提交任務(wù),線程池會創(chuàng)建新的線程執(zhí)行任務(wù)。
3.0 定時任務(wù)線程池實現(xiàn)原理
線程池執(zhí)行過程:調(diào)用sechedule相關(guān)方法時,會先把任務(wù)添加到隊列中,再又線程從隊列中取出執(zhí)行。
它接收SchduledFutureTask類型的任務(wù),是線程調(diào)度的最小單位,有三種提交方法:
1)schedule():一次行任務(wù),延遲執(zhí)行,任務(wù)只執(zhí)行一次。
2)scheduleAtFixedRate():周期性任務(wù),不不等待任務(wù)結(jié)束,每隔周期時間執(zhí)行一次,新任務(wù)放進(jìn)隊列中.
3)scheduleWithFixedDelay():周期性任務(wù),等待任務(wù)結(jié)束,每隔周期時間執(zhí)行一次.
它采用DelayedWorkQueue存儲等待的任務(wù):
1)DelayedWorkQueue內(nèi)部封裝了一個PriorityQueue,根據(jù)它會根據(jù)time的先后時間排序,若time相同則根據(jù)sequenceNumber排序;
2)DelayedWorkQueue是一個無界隊列;
3.1 SchduledFutureTask
SchduledFutureTask 接收的參數(shù)(成員變量):
1)private long time :任務(wù)開始的時間;
2)private final long sequenceNumber:任務(wù)的序號;
3)private final long period:任務(wù)執(zhí)行的間隔;
工作線程的執(zhí)行 過程:
- 工作線程會 從DelayedQueue取已經(jīng)到期的任務(wù)去執(zhí)行;
- 執(zhí)行結(jié)束后重新設(shè)置任務(wù)的到期時間,再次放回DelayedQueue
ScheduledThreadPoolExecutor會把執(zhí)行的任務(wù)放到工作隊列DelayedQueue中,DelayedQueue封裝了一個PriorityQueue,PriorityQueue會對隊列中的SchduledFutureTask 進(jìn)行排序,具體的排序算法如下:
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
1)首先按照time排序,time小的排在前面,time大的排在后面;
2)如果time相同,按照sequenceNumber排序,sequenceNumber小的排在前面,sequenceNumber大的排在后面。如果兩個task的執(zhí)行時間相同,優(yōu)先執(zhí)行先提交的task.
ScheduledFutureTaskn的run方法實現(xiàn):
run方法是調(diào)度task的核心,task 的執(zhí)行實際是run方法的執(zhí)行。
public void run() {
boolean periodic = isPeriodic();
//如果當(dāng)前線程池已經(jīng)不支持執(zhí)行任務(wù),則取消
if (!canRunInCurrentRunState(periodic))
cancel(false);
//如果不需要周期性執(zhí)行,則直接執(zhí)行run方法
else if (!periodic)
ScheduledFutureTask.super.run();
//如果需要周期性執(zhí)行,先執(zhí)行,后設(shè)置下次執(zhí)行時間
else if (ScheduledFutureTask.super.runAndReset()) {
//計算下次執(zhí)行時間
setNextRunTime();
//再次將執(zhí)行任務(wù)添加到隊列中,重復(fù)執(zhí)行。
reExecutePeriodic(outerTask);
}
}
}
reExecutePeriodic 源碼如下:
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
該方法和delayExecute方法類似,不同的是:
1)由于調(diào)用reExecutePeriodic 方法時已經(jīng)執(zhí)行過一次周期性任務(wù)了,所以不會reject當(dāng)前任務(wù);
2)傳入的任務(wù)一定是周期性任務(wù)
3.2 線程池任務(wù)提交
首先是schedule方法,該方法指任務(wù)在指定延遲時間到達(dá)后觸發(fā),只會執(zhí)行一次。
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
//參數(shù)校驗
if (callable == null || unit == null)
throw new NullPointerException();
//這是一個嵌套結(jié)構(gòu),首先把用戶提交的任務(wù)包裝成ScheduledFutureTask
//然后在調(diào)用decorateTask進(jìn)行包裝,該方法是留給用戶去擴(kuò)展的,默認(rèn)是個空方法。
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
scheduleWithFixedDelay周期性執(zhí)行任務(wù):
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// 將任務(wù)包裝成 ScheduledFutureTask 類型
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
// 再次裝飾任務(wù),可以復(fù)寫 decorateTask 方法,定制化任務(wù)
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 放入延時隊列中,ScheduledFutureTask 是接口 RunnableScheduledFuture 的一個實現(xiàn)類
// 所以放入隊列還是 ScheduledFutureTask 類型的
delayedExecute(t);
return t;
}
任務(wù)提交方法delayedExecute源碼如下:
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果線程池已經(jīng)關(guān)閉,則 使用決絕策略把提交任務(wù)拒絕掉
if (isShutdown())
reject(task);
else {
//與ThreadPoolExecutor不同的,這里直接把任務(wù)加入延遲隊列
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
//如果當(dāng)前狀態(tài)無法執(zhí)行,則取消
remove(task))
task.cancel(false);
else
//這里增加了一個worker線程,避免提交的任務(wù)沒有worker去執(zhí)行
//原因就是該類沒有像ThreadPoolExecutor 一樣,核心worker滿了,才放入隊列
ensurePrestart();
}
}
3.3 DelayedWorkerQueue
ScheduledThreadPoolExecutor之所以要在自己實現(xiàn)阻塞的工作隊列,是因為ScheduledThreadPoolExecutor要求的工作隊列有些特殊。
DelayedWorkerQueue是一個基于堆的數(shù)據(jù)結(jié)構(gòu),類似于DelayQueue和PriorityQueue。在執(zhí)行定時任務(wù)的時候,每個任務(wù)執(zhí)行時間都不同,所以DelayedWorkerQueue的工作就是按照執(zhí)行時間的升序來排列,執(zhí)行時間距離當(dāng)前時間越近的任務(wù)在隊列的qianmian(注意:這里的順序并不是絕對的,堆中的排序只保證了自己的下次執(zhí)行時間要比父節(jié)點(diǎn)的下次執(zhí)行時間要大,而葉子節(jié)點(diǎn)之間并不是順序的。)
堆結(jié)構(gòu)圖如下
可知,DelayedWorkerQueue是一個基于最小堆結(jié)構(gòu)的隊列。堆結(jié)構(gòu)可以使用數(shù)組表示,可以轉(zhuǎn)換成如下的數(shù)組
在這種結(jié)構(gòu)中,可以發(fā)下如下特點(diǎn):
假設(shè)索引值從0開始,子節(jié)點(diǎn)的索引值為K,父節(jié)點(diǎn)的索引值為P,則:
- 一個節(jié)點(diǎn)的左節(jié)點(diǎn)的索引為:k=p*2+1;
- 一個節(jié)點(diǎn)的右節(jié)點(diǎn)的索引為:k=(p+1)*2;
- 一個節(jié)點(diǎn)的父節(jié)點(diǎn)的索引為:p=(k-1)/2;
為什么要使用DelayedWorkerQueue呢?
定時任務(wù)執(zhí)行時需要取出最近要執(zhí)行的任務(wù),所以任務(wù)在隊列中每次出隊時,一定要是當(dāng)前隊列中執(zhí)行時間最靠前的,所以自然要使用優(yōu)先級隊列。
DelayedWorkerQueue是一個優(yōu)先級隊列,它可以保證每次出隊列的任務(wù)都是當(dāng)前隊列中執(zhí)行時間最靠前的,由于它是基于堆結(jié)構(gòu)的隊列,堆結(jié)構(gòu)在執(zhí)行插入和刪除操作時的最壞時間復(fù)雜度是O(logN).
DelayedWorkerQueue的屬性:文章來源:http://www.zghlxwxcb.cn/news/detail-403149.html
//隊列初始化容量
private static final int INITIAL_CAPACITY = 16;
//根據(jù)初始化容量創(chuàng)建RunnableScheduledFuture 類型的數(shù)組;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// leader 線程
private Thread leader = null;
// 當(dāng)較新的任務(wù)在隊列的頭部可用時,或者新線程可能需要成為leader,則通過該條件發(fā)出信號
private final Condition available = lock.newCondition();
注意:這里的leader,它是Leader-Follower模式的變體,用于減少不必要的定時等待。什么意思呢?對于多線程的網(wǎng)絡(luò)模型來說 所有線程會有三種身份中的一種:leader和follower,以及一個干活中的狀態(tài):proccesser。它的基木原則就是,永遠(yuǎn)最多只有一個leader,而所有follower都在等待成為leader。線程池啟動時會自動產(chǎn)生一個Leader負(fù)責(zé)等待網(wǎng)絡(luò)IO事件,當(dāng)有一個事件產(chǎn)生時,Leader線程首先通知一個Follower線程將其提拔為新的Leader,然后自己就去干活了,去處理這個網(wǎng)絡(luò)事件,處理完畢后加入Follower線程等待隊列,等待下次成為Leader。這種方法可以增強(qiáng)CPU高速緩存相似性,及消除動態(tài)內(nèi)存分配和線程間的數(shù)據(jù)交換。文章來源地址http://www.zghlxwxcb.cn/news/detail-403149.html
到了這里,關(guān)于11.定時任務(wù)&定時線程池詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!