目錄
1、線程中執(zhí)行任務(wù)的方式
2、Executor 框架
2.1 - 線程的執(zhí)行策略
2.2 - 線程池
2.3 - Executor 的生命周期
2.4 - 延任務(wù)與周期任務(wù)
3、找出可利用的并行性-代碼示例
3.1 - 單線程的 I/O 操作
3.2 - 攜帶任務(wù)結(jié)果的 Callable 與 Future(重要)
3.3 - 使用 Future 實(shí)現(xiàn)頁(yè)面渲染器
3.5 - CompletionService:Executor 與 BlockingQueue
????????大多數(shù)并發(fā)應(yīng)用程序都是圍繞“任務(wù)執(zhí)行 (Task Execution)”來(lái)構(gòu)造的:任務(wù)通常是一些抽象的且離散的工作單元。通過(guò)把應(yīng)用程序的工作分解到多個(gè)任務(wù)中,可以簡(jiǎn)化程序的組織結(jié)構(gòu),提供一種事務(wù)邊界來(lái)優(yōu)化錯(cuò)誤恢復(fù)過(guò)程,以及提供一種并行工作結(jié)構(gòu)來(lái)提升并發(fā)性。//目的:如何把一個(gè)工作拆解成多個(gè)任務(wù),并發(fā)執(zhí)行->清晰的任務(wù)邊界(獨(dú)立任務(wù)有利于并發(fā))
1、線程中執(zhí)行任務(wù)的方式
????????串行執(zhí)行:在服務(wù)器應(yīng)用程序中,串行處理機(jī)制通常都無(wú)法提供高吞吐率或快速響應(yīng)性。//一次只能執(zhí)行一個(gè)請(qǐng)求,主線程阻塞
????????并行執(zhí)行:通過(guò)多個(gè)線程來(lái)提供服務(wù),從而實(shí)現(xiàn)更高的響應(yīng)性。//多線程執(zhí)行,不阻塞主線程(將任務(wù)從主線程中分離出來(lái))-> 更快的響應(yīng)性和更高的吞吐率
????????需要注意的是,線程生命周期的開(kāi)銷(xiāo)非常高(Java中創(chuàng)建線程需要內(nèi)核態(tài)的支持)?;钴S的線程會(huì)消耗系統(tǒng)資源,尤其是內(nèi)存(TCB)。
????????所以,在一定的范圍內(nèi),增加線程可以提高系統(tǒng)的吞吐率,但如果超出了這個(gè)范圍,再創(chuàng)建更多的線程只會(huì)降低程序的執(zhí)行速度,并且如果過(guò)多地創(chuàng)建一個(gè)線程,那么整個(gè)應(yīng)用程序?qū)⒈罎?。要想避免這種危險(xiǎn),就應(yīng)該對(duì)應(yīng)用程序可以創(chuàng)建的線程數(shù)量進(jìn)行限制,并且全面地測(cè)試應(yīng)用程序,從而確保在線程數(shù)量達(dá)到限制時(shí),程序也不會(huì)耗盡盜源。//選擇合適的線程數(shù)量,并需要對(duì)線程資源進(jìn)行管理(避免無(wú)限創(chuàng)建線程)
2、Executor 框架
????????Executor 基于生產(chǎn)者- 消費(fèi)者模式,提交任務(wù)的操作相當(dāng)于生產(chǎn)者 ,執(zhí)行任務(wù)的線程則相當(dāng)于消費(fèi)者。如果要在程序中實(shí)現(xiàn)一個(gè)生產(chǎn)者-消費(fèi)者的設(shè)計(jì),那么最簡(jiǎn)單的方式通常就是使用 Executor。
????????在 TaskExecutionWebServer 中,通過(guò)使用 Executor,將請(qǐng)求處理任務(wù)的提交與任務(wù)的實(shí)際執(zhí)行解耦開(kāi)來(lái),代碼如下所示://Executor是一個(gè)接口,可使用Java提供的實(shí)現(xiàn),也可以自己去實(shí)現(xiàn)
public class TaskExecutionWebServer {
private static final int NTHREADS = 100;
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
//任務(wù)
Runnable task = () -> handleRequest(connection);
//使用Executor執(zhí)行任務(wù)
exec.execute(task);
}
}
private static void handleRequest(Socket connection) {
// request-handling logic here
}
}
2.1 - 線程的執(zhí)行策略
????????通過(guò)將任務(wù)的提交與執(zhí)行解耦開(kāi)來(lái),從而無(wú)須太大的困難就可以為某種類(lèi)型的任務(wù)指定和修改執(zhí)行策略。在執(zhí)行策略中定義了任務(wù)執(zhí)行的 “What、Where、When、How” 等方面,包括:
- 在什么線程中執(zhí)行任務(wù)??
- 任務(wù)按照什么順序執(zhí)行 (FIFO、LIFO、優(yōu)先級(jí))??
- 有多少個(gè)任務(wù)能并發(fā)執(zhí)行?
- 在隊(duì)列中有多少個(gè)任務(wù)在等待執(zhí)行?
- 如果系統(tǒng)由于過(guò)載而需要拒絕一個(gè)任務(wù),那么應(yīng)該選擇哪一個(gè)任務(wù)?另外,如何通知應(yīng)用程序有任務(wù)被拒絕?
- 在執(zhí)行一個(gè)任務(wù)之前或之后,應(yīng)該進(jìn)行哪些動(dòng)作??
????????各種執(zhí)行策略都是一種資源管理工具,最佳策略取決于可用的計(jì)算資源以及對(duì)服務(wù)質(zhì)量的需求。通過(guò)限制并發(fā)任務(wù)的數(shù)量,可以確保應(yīng)用程序不會(huì)由于資源耗盡而失敗,或者由于在稀缺資源上發(fā)生競(jìng)爭(zhēng)而嚴(yán)重影響性能。通過(guò)將任務(wù)的提交與任務(wù)的執(zhí)行策略分離開(kāi)來(lái),有助于在部署階段選擇與可用硬件資源最匹配的執(zhí)行策略。//對(duì)于如何執(zhí)行任務(wù)的描述
2.2 - 線程池
????????線程池,指管理一組同構(gòu)工作線程的資源池。線程池是與工作隊(duì)列 (Work Oueue) 密切相關(guān)的,其中在工作隊(duì)列中保存了所有等待執(zhí)行的任務(wù)。工作線程 (Worker Thread) 的任務(wù)很簡(jiǎn)單:從工作隊(duì)列中獲取一個(gè)任務(wù),執(zhí)行任務(wù),然后返回線程池并等待下一個(gè)任務(wù)。//線程池->儲(chǔ)存線程,工作隊(duì)列->儲(chǔ)存任務(wù)
? ? ? ? Java 類(lèi)庫(kù)提供了一個(gè)靈活的線程池以及一些有用的默認(rèn)配置。可以通過(guò)調(diào)用 Executors 中的靜態(tài)工廠方法之一來(lái)創(chuàng)建一個(gè)線程池:
????????newFixedThreadPool。newFixedThreadPool 將創(chuàng)建一個(gè)固定長(zhǎng)度的線程池,每當(dāng)提交一個(gè)任務(wù)時(shí)就創(chuàng)建一個(gè)線程,直到達(dá)到線程池的最大數(shù)量,這時(shí)線程池的規(guī)模將不再變化(如果某個(gè)線程由于發(fā)生了未預(yù)期的 Exception 而結(jié)束,那么線程池會(huì)補(bǔ)充一個(gè)新的線程)。//限制線程數(shù)量
????????newCachedThreadPool。newCachedThreadPool 將創(chuàng)建一個(gè)可緩存的線程池,如果線程池的當(dāng)前規(guī)模超過(guò)了處理需求時(shí),那么將回收空閑的線程,而當(dāng)需求增加時(shí),則可以添加新的線程,線程池的規(guī)模不存在任何限制。//不限制線程數(shù)量
????????newSingleThreadExecutor。newSingleThreadExecutor 是一個(gè)單線程的 Executor,它創(chuàng)建單個(gè)工作者線程來(lái)執(zhí)行任務(wù),如果這個(gè)線程異常結(jié)束,會(huì)創(chuàng)建另一個(gè)線程來(lái)替代。newSingleThreadExecutor 能確保依照任務(wù)在隊(duì)列中的順序來(lái)串行執(zhí)行(例如 FIFO、LIFO優(yōu)先級(jí))。//按順序執(zhí)行任務(wù)
????????newScheduledThreadPool。newScheduledThreadPool 創(chuàng)建了一個(gè)固定長(zhǎng)度的線程池,而且以延遲或定時(shí)的方式來(lái)執(zhí)行任務(wù),類(lèi)似于 Timer。//執(zhí)行定時(shí)任務(wù)
2.3 - Executor 的生命周期
????????為了解決執(zhí)行服務(wù)的生命周期問(wèn)題,Executor 擴(kuò)展了 ExecutorService 接口,添加了一些用于生命周期管理的方法,同時(shí)還有一些用于任務(wù)提交的便利方法。
/*
* 該Executor提供管理線程池終止的方法,以及提供用于跟蹤一個(gè)或多個(gè)異步任務(wù)進(jìn)度的Future的方法。
*/
public interface ExecutorService extends Executor {
//1-關(guān)閉執(zhí)行器(線程池):不再接收新任務(wù),然后等待正在執(zhí)行的線程執(zhí)行完畢
void shutdown();
//關(guān)閉執(zhí)行器(線程池):停止所有正在執(zhí)行的任務(wù),并返回等待執(zhí)行的任務(wù)列表
List<Runnable> shutdownNow();
//2-判斷執(zhí)行器(線程池)是否關(guān)閉
boolean isShutdown();
//3-判斷執(zhí)行器(線程池)關(guān)閉后所有任務(wù)是否已完成
//注意,除非先調(diào)用shutdown/shutdownNow,否則isTerminated永遠(yuǎn)不會(huì)為true。
boolean isTerminated();
//4-阻塞當(dāng)前線程,直到所有任務(wù)完成或中斷或當(dāng)前等待超時(shí)
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
//5-執(zhí)行給定的任務(wù),并返回表示該任務(wù)的Future。
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
//6-執(zhí)行給定的任務(wù),并在所有任務(wù)完成后返回保存其狀態(tài)和結(jié)果的future列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
//7-執(zhí)行給定的任務(wù),如果有成功完成的任務(wù),則返回成功完成的任務(wù)的結(jié)果
//未完成的任務(wù)將被取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
????????ExecutorService 的生命周期有 3 種狀態(tài):運(yùn)行、關(guān)閉和已終止。ExecutorService 在初始創(chuàng)建時(shí)處于運(yùn)行狀態(tài)。
- shutdown 方法將執(zhí)行平緩的關(guān)閉過(guò)程:不再接受新的任務(wù),同時(shí)等待已經(jīng)提交的任務(wù)執(zhí)行完成一一包括那些還未開(kāi)始執(zhí)行的任務(wù)。
- shutdownNow 方法將執(zhí)行粗暴的關(guān)閉過(guò)程:它將嘗試取消所有運(yùn)行中的任務(wù),并且不再啟動(dòng)隊(duì)列中尚未開(kāi)始執(zhí)行的任務(wù)。
????????在 ExecutorService 關(guān)閉后提交的任務(wù)將由 "拒絕執(zhí)行處理器(Reiected Execution Handler)" 來(lái)處理,它會(huì)拋棄任務(wù),或者使得 execute 方法拋出一個(gè)未檢查的 ReiectedExecutionException。等所有任務(wù)都完成后,ExecutorService 將轉(zhuǎn)入終止?fàn)顟B(tài)。
????????可以調(diào)用 awaitTermination 來(lái)等待 ExecutorService 到達(dá)終止?fàn)顟B(tài),或者通過(guò)調(diào)用isTerminated 來(lái)輪詢 ExecutorService 是否已經(jīng)終止。通常在調(diào)用 awaitTermination 之后會(huì)立即調(diào)用 shutdown,從而產(chǎn)生同步地關(guān)閉 ExecutorService 的效果。//兩個(gè)方法結(jié)合使用,安全的關(guān)閉?ExecutorService:awaitTermination +?shutdown
2.4 - 延任務(wù)與周期任務(wù)
????????Timer 類(lèi)負(fù)責(zé)管理延遲任務(wù)以及周期任務(wù)。然而,Timer 存在一些缺陷,因此應(yīng)該考慮使用 ScheduledThreadPoolExecutor 來(lái)代替它。可以通過(guò) ScheduledThreadPoolExecutor 的構(gòu)造函數(shù)或 newScheduledThreadPool 工方法來(lái)創(chuàng)建該類(lèi)的對(duì)象。
????????Timer 類(lèi)的問(wèn)題:
????????(1)Timer 在執(zhí)行所有定時(shí)任務(wù)時(shí)只會(huì)創(chuàng)建一個(gè)線程。如果某個(gè)任務(wù)的執(zhí)行時(shí)間過(guò)長(zhǎng),那么將破壞其他 TimerTask 的定時(shí)精確性。//任務(wù)執(zhí)行時(shí)間重疊引發(fā)的問(wèn)題,使用多線程可避免
? ? ? ? (2)Timer 線程并不捕獲異常,因此當(dāng) TimerTask 拋出未檢查的異常時(shí)將終止定時(shí)線程。這種情況下,Timer 也不會(huì)恢復(fù)線程的執(zhí)行,而是會(huì)錯(cuò)誤地認(rèn)為整個(gè) Timer 都被取消了。因此,已經(jīng)被調(diào)度但尚未執(zhí)行的 TimerTask 將不會(huì)再執(zhí)行,新的任務(wù)也不能被調(diào)度,這個(gè)問(wèn)題稱(chēng)之為"線程泄漏"。//不能處理異常,不能從異常中恢復(fù)
3、找出可利用的并行性-代碼示例
????????//從示例中總結(jié)方法
3.1 - 單線程的 I/O 操作
????????例如,下邊的一個(gè)頁(yè)面渲染器程序,程序中圖像下載過(guò)程的大部分時(shí)間都是在等待 I/O 操作執(zhí)行完成,在這期間 CPU 幾乎不做任何工作。因此,這種串行執(zhí)行方法沒(méi)有充分地利用 CPU,使得用戶在看到最終頁(yè)面之前要等待過(guò)長(zhǎng)的時(shí)間。
import java.util.*;
/**
* 使用單線的程渲染器
*/
public abstract class SingleThreadRenderer {
/**
* 渲染頁(yè)面
*/
void renderPage(CharSequence source) {
//1-加載文本數(shù)據(jù)
renderText(source);
List<ImageData> imageData = new ArrayList<>();
//下載多個(gè)圖片資源
for (ImageInfo imageInfo : scanForImageInfo(source)) {
//TODO:圖像下載大部分時(shí)間都是I/O操作
imageData.add(imageInfo.downloadImage());
}
for (ImageData data : imageData) {
//2-加載圖片數(shù)據(jù)
renderImage(data);
}
}
interface ImageData {
}
interface ImageInfo {
ImageData downloadImage();
}
abstract void renderText(CharSequence s);
abstract List<ImageInfo> scanForImageInfo(CharSequence s);
abstract void renderImage(ImageData i);
}
????????通過(guò)將問(wèn)題分解為多個(gè)獨(dú)立的任務(wù)并發(fā)執(zhí)行,能夠獲得更高的 CPU 利用率和響應(yīng)靈敏度。
3.2 - 攜帶任務(wù)結(jié)果的 Callable 與 Future(重要)
????????Executor 框架使用 Runnable 作為其基本的任務(wù)表示形式。不過(guò) Runnable 具有很大的局限性,雖然 run 方法能通過(guò)寫(xiě)入日志文件或者將結(jié)果放入某個(gè)共享的數(shù)據(jù)結(jié)構(gòu)來(lái)保存執(zhí)行結(jié)果,但是它不能返回一個(gè)值或拋出一個(gè)受檢查的異常。//Runnable不具備返回值
????????許多任務(wù)實(shí)際上都存在延遲的計(jì)算,比如執(zhí)行數(shù)據(jù)庫(kù)查詢,從網(wǎng)絡(luò)上獲取資源,或者計(jì)算某個(gè)復(fù)雜的功能等。對(duì)于這些任務(wù),Callable 是一種更好的抽象:它認(rèn)為主入口點(diǎn)(即 call)將返回一個(gè)值,并可能拋出一個(gè)異常。//通過(guò)?Callable 返回的 Future 對(duì)象能取消未執(zhí)行的任務(wù)?
????????Runnable 和 Callable 描述的都是抽象的計(jì)算任務(wù)。這些任務(wù)通常是有范圍的,即都有一個(gè)明確的起始點(diǎn),并且最終會(huì)結(jié)束。Executor 執(zhí)行的任務(wù)有 4 個(gè)生命周期階段:創(chuàng)建、提交、開(kāi)始和完成。由于有些任務(wù)可能要執(zhí)行很長(zhǎng)的時(shí)間,因此通常希望能夠取消這些任務(wù)。在 Executor 框架中,已提交但尚未開(kāi)始的任務(wù)可以取消,但對(duì)于那些已經(jīng)開(kāi)始執(zhí)行的任務(wù),只有當(dāng)它們能響應(yīng)中斷時(shí),才能取消。取消一個(gè)已經(jīng)完成的任務(wù)不會(huì)有任何影響。//任務(wù)可以提交也可以取消,執(zhí)行任務(wù)是具有生命周期的
????????Future 表示一個(gè)任務(wù)的生命周期,并提供了相應(yīng)的方法來(lái)判斷任務(wù)是否已經(jīng)完成或取消,以及獲取任務(wù)的結(jié)果和取消任務(wù)等。在 Future 規(guī)范中包含的隱含意義是,任務(wù)的生命周期只能前進(jìn),不能后退,就像 ExecutorService 的生命周期一樣。當(dāng)某個(gè)任務(wù)完成后,它就永遠(yuǎn)停留在 "完成" 狀態(tài)上。//任務(wù)執(zhí)行生命周期的順序不能打亂,必須按照規(guī)定的順序進(jìn)行
????????get 方法的行為取決于任務(wù)的狀態(tài)(尚未開(kāi)始、正在運(yùn)行、已完成)。
????????如果任務(wù)已經(jīng)完成,那么 get 會(huì)立即返回或者拋出一個(gè) Exception,如果任務(wù)沒(méi)有完成,那么 get 將阻塞并直到任務(wù)完成。//get方法可能獲得結(jié)果也可能拋出異常
????????如果任務(wù)拋出了異常,那么 get 將該異常封裝為 ExecutionException 并重新拋出。如果任務(wù)被取消,那么 get 將拋出 CancellationException。如果 get 拋出了?ExecutionException,那么可以通過(guò) getCause 來(lái)獲得被封裝的初始異常。
//Future接口
public interface Future<V> {
//嘗試取消執(zhí)行此任務(wù)。
//如果任務(wù)已經(jīng)完成或取消,或者由于其他原因無(wú)法取消,則此方法不起作用(返回false)。
// 否則,如果在調(diào)用cancel時(shí)該任務(wù)尚未啟動(dòng),則該任務(wù)不應(yīng)運(yùn)行。
//如果任務(wù)已經(jīng)啟動(dòng),那么mayInterruptIfRunning參數(shù)決定是否中斷正在執(zhí)行的任務(wù),
// true 進(jìn)行中斷,false允許程序執(zhí)行完成。
boolean cancel(boolean mayInterruptIfRunning);
//如果此任務(wù)在正常完成之前被取消,則返回true。
boolean isCancelled();
//如果此任務(wù)完成,則返回true。
//完成可能是由于正常終止、異?;蛉∠谒羞@些情況下,此方法都將返回true。
boolean isDone();
//等待計(jì)算完成,然后檢索其執(zhí)行結(jié)果。
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
????????可以通過(guò)許多種方法創(chuàng)建一個(gè) Future 來(lái)描述任務(wù)。ExecutorService 中的所有 submit 方法都將返回一個(gè) Future,從而將一個(gè) Runnable 或 Callable 提交給 Executor,并得到一個(gè) Future 用來(lái)獲得任務(wù)的執(zhí)行結(jié)果或者取消任務(wù)。
????????還可以顯式地為某個(gè)指定的 Runnable 或 Callable 實(shí)例化一個(gè) FutureTask。由于 FutureTask實(shí)現(xiàn)了 Runnable,因此可以將它提交給 Executor 來(lái)執(zhí)行或者直接調(diào)用它的 run 方法。
3.3 - 使用 Future 實(shí)現(xiàn)頁(yè)面渲染器
????????為了使頁(yè)面渲染器實(shí)現(xiàn)更高的并發(fā)性,首先將渲染過(guò)程分解為兩個(gè)任務(wù),一個(gè)是渲染所有的文本,另一個(gè)是下載所有的圖像。因?yàn)槠渲幸粋€(gè)任務(wù)是 CPU 密集型,而另一個(gè)任務(wù)是 I/O 密集型,因此這種方法即使在單 CPU 系統(tǒng)上也能提升性能。//思路一:將I/O密集型任何和CPU密集型任務(wù)分開(kāi)
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 使用Future的渲染器
*/
public abstract class FutureRenderer {
private final ExecutorService executor = Executors.newCachedThreadPool();
void renderPage(CharSequence source) {
//1-獲取圖片路徑信息
final List<ImageInfo> imageInfos = scanForImageInfo(source);
//2-下載圖片任務(wù)-> I/O密集型
Callable<List<ImageData>> task = () -> {
List<ImageData> result = new ArrayList<>();
for (ImageInfo imageInfo : imageInfos) {
//下載圖片資源
result.add(imageInfo.downloadImage());
}
return result;
};
//3-使用線程池執(zhí)行下載圖片任務(wù)
Future<List<ImageData>> future = executor.submit(task);
//4-加載文本數(shù)據(jù)-> CPU密集型
renderText(source);
//5-加載圖片數(shù)據(jù)
try {
//TODO:同步獲取所有結(jié)果,線程阻塞
List<ImageData> imageData = future.get();
for (ImageData data : imageData) {
renderImage(data);
}
} catch (InterruptedException e) {
// 重新設(shè)置線程的中斷標(biāo)記
Thread.currentThread().interrupt();
// 我們不需要這個(gè)結(jié)果,所以也取消這個(gè)任務(wù)
future.cancel(true);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
interface ImageData {
}
interface ImageInfo {
ImageData downloadImage();
}
abstract void renderText(CharSequence s);
abstract List<ImageInfo> scanForImageInfo(CharSequence s);
abstract void renderImage(ImageData i);
}
????????FutureRenderer 使得染文本任務(wù)與下載圖像數(shù)據(jù)的任務(wù)并發(fā)地執(zhí)行。當(dāng)所有圖像下載完后,會(huì)顯示到頁(yè)面上。這將提升用戶體驗(yàn),不僅使用戶更快地看到結(jié)果,還有效利用了并行性,但我們還可以做得更好。用戶不必等到所有的圖像都下載完成,而希望看到每當(dāng)下載完一幅圖像時(shí)就立即顯示出來(lái)。//要求不等到所有結(jié)果出來(lái)才渲染,而是出來(lái)就一個(gè)渲染一個(gè)
3.5 - CompletionService:Executor 與 BlockingQueue
????????如果向 Executor 提交了一組計(jì)算任務(wù),并且希望在計(jì)算完成后獲得結(jié)果,那么可以保留與每個(gè)任務(wù)關(guān)聯(lián)的 Future,然后反復(fù)使用 get 方法,同時(shí)將參數(shù) timeout 指定為 0,從而通過(guò)輪詢來(lái)判斷任務(wù)是否完成。這種方法雖然可行,但卻有些繁瑣。幸運(yùn)的是,還有一種更好的方法:完成服務(wù)(CompletionService)。
????????CompletionService 將 Executor 和 BlockingQueue 的功能融合在一起。你可以將 Callable 任務(wù)提交給它來(lái)執(zhí)行,然后使用類(lèi)似于隊(duì)列操作的 take 和 poll 等方法來(lái)獲得已完成的結(jié)果,而這些結(jié)果會(huì)在完成時(shí)將被封裝為 Future。ExecutorCompletionService 實(shí)現(xiàn)了 CmpletionService 并將計(jì)算部分委托給一個(gè) Executor。
????????ExecutorCompletionService 的實(shí)現(xiàn)非常簡(jiǎn)單。在構(gòu)造函數(shù)中創(chuàng)建一個(gè) BlockingQueue 來(lái)保存計(jì)算完成的結(jié)果。當(dāng)計(jì)算完成時(shí),調(diào)用 FutureTask 中的 done 方法。當(dāng)提交某個(gè)任務(wù)時(shí),該任務(wù)將首先包裝為一個(gè) QueueingFuture,這是 FutureTask 的一個(gè)子類(lèi),然后再改寫(xiě)子類(lèi)的 done 方法,并將結(jié)果放入 BlockingQueue 中。
????????使用 CompletionService 實(shí)現(xiàn)頁(yè)面渲染器:
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
/**
* 使用 CompletionService 實(shí)現(xiàn)頁(yè)面渲染器
*/
public abstract class Renderer {
private final ExecutorService executor;
Renderer(ExecutorService executor) {
this.executor = executor;
}
void renderPage(CharSequence source) {
final List<ImageInfo> info = scanForImageInfo(source);
//1-使用CompletionService執(zhí)行任務(wù)
CompletionService<ImageData> completionService = new ExecutorCompletionService<>(executor);
for (final ImageInfo imageInfo : info) {
completionService.submit(() -> imageInfo.downloadImage());
}
renderText(source);
try {
//2-從CompletionService中獲取執(zhí)行任務(wù)的結(jié)果,遍歷次數(shù)為提交任務(wù)的數(shù)量
for (int t = 0, n = info.size(); t < n; t++) {
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
interface ImageData {
}
interface ImageInfo {
ImageData downloadImage();
}
abstract void renderText(CharSequence s);
abstract List<ImageInfo> scanForImageInfo(CharSequence s);
abstract void renderImage(ImageData i);
}
????????通過(guò) CompletionService 從兩個(gè)方面來(lái)提高頁(yè)面染器的性能:縮短總運(yùn)行時(shí)間以及提高響應(yīng)性。為每一幅圖像的下載都創(chuàng)建一個(gè)獨(dú)立任務(wù),并在線程池中執(zhí)行它們,從而將串行的下載過(guò)程轉(zhuǎn)換為并行的過(guò)程,這將減少下載所有圖像的總時(shí)間。此外,通過(guò)從 CompletionService 中獲取結(jié)果以及使每張圖片在下載完成后立刻顯示出來(lái),能使用戶獲得一個(gè)更加動(dòng)態(tài)和更高響應(yīng)性的用戶界面。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-666540.html
????????至此,全文結(jié)束。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-666540.html
到了這里,關(guān)于并發(fā)編程5:如何執(zhí)行任務(wù)?的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!