目錄
一、徹底理解Java的Future模式
二、為什么出現(xiàn)Future機制
2.1 Future 類有什么用?
三、Future的相關(guān)類圖
2.1 Future 接口
2.2 FutureTask 類
五、FutureTask源碼分析
5.1 state字段
5.2 其他變量
5.3 CAS工具初始化
5.4 構(gòu)造函數(shù)
5.5 jdk1.8和之前版本的區(qū)別
六、Callable 和 Future 有什么關(guān)系?
七、CompletableFuture 類
一、徹底理解Java的Future模式
先上一個場景:假如你突然想做飯,但是沒有廚具,也沒有食材。網(wǎng)上購買廚具比較方便,食材去超市買更放心。
實現(xiàn)分析:在快遞員送廚具的期間,我們肯定不會閑著,可以去超市買食材。所以,在主線程里面另起一個子線程去網(wǎng)購廚具。
但是,子線程執(zhí)行的結(jié)果是要返回廚具的,而run方法是沒有返回值的。所以,這才是難點,需要好好考慮一下。
模擬代碼1:
package test;
public class CommonCook {
public static void main(String[] args) throws InterruptedException {
long startTime = System.currentTimeMillis();
// 第一步 網(wǎng)購廚具
OnlineShopping thread = new OnlineShopping();
thread.start();
thread.join(); // 保證廚具送到
// 第二步 去超市購買食材
Thread.sleep(2000); // 模擬購買食材時間
Shicai shicai = new Shicai();
System.out.println("第二步:食材到位");
// 第三步 用廚具烹飪食材
System.out.println("第三步:開始展現(xiàn)廚藝");
cook(thread.chuju, shicai);
System.out.println("總共用時" + (System.currentTimeMillis() - startTime) + "ms");
}
// 網(wǎng)購廚具線程
static class OnlineShopping extends Thread {
private Chuju chuju;
@Override
public void run() {
System.out.println("第一步:下單");
System.out.println("第一步:等待送貨");
try {
Thread.sleep(5000); // 模擬送貨時間
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一步:快遞送到");
chuju = new Chuju();
}
}
// 用廚具烹飪食材
static void cook(Chuju chuju, Shicai shicai) {}
// 廚具類
static class Chuju {}
// 食材類
static class Shicai {}
}
運行結(jié)果:
第一步:下單
第一步:等待送貨
第一步:快遞送到
第二步:食材到位
第三步:開始展現(xiàn)廚藝
總共用時7013ms
可以看到,多線程已經(jīng)失去了意義。在廚具送到期間,我們不能干任何事。對應(yīng)代碼,就是調(diào)用join方法阻塞主線程。
有人問了,不阻塞主線程行不行???
不行!!!
從代碼來看的話,run方法不執(zhí)行完,屬性chuju就沒有被賦值,還是null,后面執(zhí)行就會出現(xiàn)空指針異常。換句話說,沒有廚具,怎么做飯。
Java現(xiàn)在的多線程機制,核心方法run是沒有返回值的;如果要保存run方法里面的計算結(jié)果,必須等待run方法計算完,無論計算過程多么耗時。
面對這種尷尬的處境,程序員就會想:在子線程run方法計算的期間,能不能在主線程里面繼續(xù)異步執(zhí)行???
Where there is a will,there is a way?。?!
這種想法的核心就是Future模式,下面先應(yīng)用一下Java自己實現(xiàn)的Future模式。
模擬代碼2:
package test;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureCook {
public static void main(String[] args) throws InterruptedException, ExecutionException {
long startTime = System.currentTimeMillis();
// 第一步 網(wǎng)購廚具
Callable<Chuju> onlineShopping = new Callable<Chuju>() {
@Override
public Chuju call() throws Exception {
System.out.println("第一步:下單");
System.out.println("第一步:等待送貨");
Thread.sleep(5000); // 模擬送貨時間
System.out.println("第一步:快遞送到");
return new Chuju();
}
};
FutureTask<Chuju> task = new FutureTask<Chuju>(onlineShopping);
new Thread(task).start();
// 第二步 去超市購買食材
Thread.sleep(2000); // 模擬購買食材時間
Shicai shicai = new Shicai();
System.out.println("第二步:食材到位");
// 第三步 用廚具烹飪食材
if (!task.isDone()) { // 聯(lián)系快遞員,詢問是否到貨
System.out.println("第三步:廚具還沒到,心情好就等著(心情不好就調(diào)用cancel方法取消訂單)");
}
Chuju chuju = task.get();
System.out.println("第三步:廚具到位,開始展現(xiàn)廚藝");
cook(chuju, shicai);
System.out.println("總共用時" + (System.currentTimeMillis() - startTime) + "ms");
}
// 用廚具烹飪食材
static void cook(Chuju chuju, Shicai shicai) {}
// 廚具類
static class Chuju {}
// 食材類
static class Shicai {}
}
運行結(jié)果:
第一步:下單
第一步:等待送貨
第二步:食材到位
第三步:廚具還沒到,心情好就等著(心情不好就調(diào)用cancel方法取消訂單)
第一步:快遞送到
第三步:廚具到位,開始展現(xiàn)廚藝
總共用時5005ms
?可以看見,在快遞員送廚具的期間,我們沒有閑著,可以去買食材;而且我們知道廚具到?jīng)]到,甚至可以在廚具沒到的時候,取消訂單不要了。
好神奇,有沒有。
當然你可能會說模擬代碼1中,只要join在cook(thread.chuju, shicai);就可以了,但是用future應(yīng)該來說從編程角度更加自然。
比如模擬二的邏輯:
if (!task.isDone()) { // 聯(lián)系快遞員,詢問是否到貨
System.out.println("第三步:廚具還沒到,心情好就等著(心情不好就調(diào)用cancel方法取消訂單)");
}
Chuju chuju = task.get();
等價于模擬一:
thread.start();
// 第二步 去超市購買食材
Thread.sleep(2000); // 模擬購買食材時間
Shicai shicai = new Shicai();
System.out.println("第二步:食材到位");
thread.join(); // 保證廚具送到 ---------------join應(yīng)該在購買食材開始后,而不是之前
用join這種寫法也可以達到future的效果,但是使用Future不僅僅是更自然,而且確實是增加了靈活性,比如任務(wù)完成與否的判斷,任務(wù)的取消等。這些是join不能做到的。
下面具體分析一下第二段代碼:
1)把耗時的網(wǎng)購廚具邏輯,封裝到了一個Callable的call方法里面。
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
Callable接口可以看作是Runnable接口的補充,call方法帶有返回值,并且可以拋出異常。
?
2)把Callable實例當作參數(shù),生成一個FutureTask的對象,然后把這個對象當作一個Runnable,作為參數(shù)另起線程。
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
這個繼承體系中的核心接口是Future。Future的核心思想是:一個方法f,計算過程可能非常耗時,等待f返回,顯然不明智。可以在調(diào)用f的時候,立馬返回一個Future,可以通過Future這個數(shù)據(jù)結(jié)構(gòu)去控制方法f的計算過程。
這里的控制包括:
get方法:獲取計算結(jié)果(如果還沒計算完,也是必須等待的)
cancel方法:還沒計算完,可以取消計算過程
isDone方法:判斷是否計算完
isCancelled方法:判斷計算是否被取消
這些接口的設(shè)計很完美,FutureTask的實現(xiàn)注定不會簡單,后面再說。
?
3)在第三步里面,調(diào)用了isDone方法查看狀態(tài),然后直接調(diào)用task.get方法獲取廚具,不過這時還沒送到,所以還是會等待3秒。對比第一段代碼的執(zhí)行結(jié)果,這里我們節(jié)省了2秒。這是因為在快遞員送貨期間,我們?nèi)コ匈徺I食材,這兩件事在同一時間段內(nèi)異步執(zhí)行。
二、為什么出現(xiàn)Future機制
常見的兩種創(chuàng)建線程的方式。一種是直接繼承Thread,另外一種就是實現(xiàn)Runnable接口。
這兩種方式都有一個缺陷就是:在執(zhí)行完任務(wù)之后無法獲取執(zhí)行結(jié)果。
從Java 1.5開始,就提供了Callable和Future,通過它們可以在任務(wù)執(zhí)行完畢之后得到任務(wù)執(zhí)行結(jié)果。
Future模式的核心思想是能夠讓主線程將原來需要同步等待的這段時間用來做其他的事情。(因為可以異步獲得執(zhí)行結(jié)果,所以不用一直同步等待去獲得執(zhí)行結(jié)果)
上圖簡單描述了不使用Future和使用Future的區(qū)別,不使用Future模式,主線程在invoke完一些耗時邏輯之后需要等待,這個耗時邏輯在實際應(yīng)用中可能是一次RPC調(diào)用,可能是一個本地IO操作等。B圖表達的是使用Future模式之后,我們主線程在invoke之后可以立即返回,去做其他的事情,回頭再來看看剛才提交的invoke有沒有結(jié)果。
2.1 Future 類有什么用?
Future?類是異步思想的典型運用,主要用在一些需要執(zhí)行耗時任務(wù)的場景,避免程序一直原地等待耗時任務(wù)執(zhí)行完成,執(zhí)行效率太低。具體來說是這樣的:當我們執(zhí)行某一耗時的任務(wù)時,可以將這個耗時任務(wù)交給一個子線程去異步執(zhí)行,同時我們可以干點其他事情,不用傻傻等待耗時任務(wù)執(zhí)行完成。等我們的事情干完后,我們再通過?Future?類獲取到耗時任務(wù)的執(zhí)行結(jié)果。這樣一來,程序的執(zhí)行效率就明顯提高了。
這其實就是多線程中經(jīng)典的?Future 模式,你可以將其看作是一種設(shè)計模式,核心思想是異步調(diào)用,主要用在多線程領(lǐng)域,并非 Java 語言獨有。
在 Java 中,F(xiàn)uture?類只是一個泛型接口,位于?java.util.concurrent?包下,其中定義了 5 個方法,主要包括下面這 4 個功能:
- 取消任務(wù);
- 判斷任務(wù)是否被取消;
- 判斷任務(wù)是否已經(jīng)執(zhí)行完成;
- 獲取任務(wù)執(zhí)行結(jié)果。
// V 代表了Future執(zhí)行的任務(wù)返回值的類型
public interface Future<V> {
// 取消任務(wù)執(zhí)行
// 成功取消返回 true,否則返回 false
boolean cancel(boolean mayInterruptIfRunning);
// 判斷任務(wù)是否被取消
boolean isCancelled();
// 判斷任務(wù)是否已經(jīng)執(zhí)行完成
boolean isDone();
// 獲取任務(wù)執(zhí)行結(jié)果
V get() throws InterruptedException, ExecutionException;
// 指定時間內(nèi)沒有返回計算結(jié)果就拋出 TimeOutException 異常
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutExceptio
}
簡單理解就是:我有一個任務(wù),提交給了?Future?來處理。任務(wù)執(zhí)行期間我自己可以去做任何想做的事情。并且,在這期間我還可以取消任務(wù)以及獲取任務(wù)的執(zhí)行狀態(tài)。一段時間之后,我就可以?Future?那里直接取出任務(wù)執(zhí)行結(jié)果。
三、Future的相關(guān)類圖
2.1 Future 接口
首先,我們需要清楚,F(xiàn)utrue是個接口。Future就是對于具體的Runnable或者Callable任務(wù)的執(zhí)行結(jié)果進行取消、查詢是否完成、獲取結(jié)果。必要時可以通過get方法獲取執(zhí)行結(jié)果,該方法會阻塞直到任務(wù)返回結(jié)果。
接口定義行為,我們通過上圖可以看到實現(xiàn)Future接口的子類會具有哪些行為:
- 我們可以取消這個執(zhí)行邏輯,如果這個邏輯已經(jīng)正在執(zhí)行,提供可選的參數(shù)來控制是否取消已經(jīng)正在執(zhí)行的邏輯。
- 我們可以判斷執(zhí)行邏輯是否已經(jīng)被取消。
- 我們可以判斷執(zhí)行邏輯是否已經(jīng)執(zhí)行完成。
- 我們可以獲取執(zhí)行邏輯的執(zhí)行結(jié)果。
- 我們可以允許在一定時間內(nèi)去等待獲取執(zhí)行結(jié)果,如果超過這個時間,拋TimeoutException。
2.2 FutureTask 類
類圖如下:
FutureTask是Future的具體實現(xiàn)。FutureTask實現(xiàn)了RunnableFuture接口。RunnableFuture接口又同時繼承了Future 和 Runnable 接口。所以FutureTask既可以作為Runnable被線程執(zhí)行,又可以作為Future得到Callable的返回值。
五、FutureTask源碼分析
5.1 state字段
volatile修飾的state字段;表示FutureTask當前所處的狀態(tài)。可能的轉(zhuǎn)換過程見注釋。
?
/**
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL(創(chuàng)建到正常運行結(jié)束的狀態(tài)變化軌跡)
* NEW -> COMPLETING -> EXCEPTIONAL(創(chuàng)建到異常運行結(jié)束的狀態(tài)變化軌跡)
* NEW -> CANCELLED (創(chuàng)建到取消的狀態(tài)變化軌跡)
* NEW -> INTERRUPTING -> INTERRUPTED(創(chuàng)建到中斷結(jié)束的狀態(tài)變化軌跡)
*/
private volatile int state;
// NEW 新建狀態(tài),表示這個 FutureTask還沒有開始運行
private static final int NEW = 0;
// COMPLETING 完成狀態(tài), 表示 FutureTask 任務(wù)已經(jīng)計算完畢了
// 但是還有一些后續(xù)操作,例如喚醒等待線程操作,還沒有完成。
private static final int COMPLETING = 1;
// FutureTask 任務(wù)完結(jié),正常完成,沒有發(fā)生異常
private static final int NORMAL = 2;
// FutureTask 任務(wù)完結(jié),因為發(fā)生異常。
private static final int EXCEPTIONAL = 3;
// FutureTask 任務(wù)完結(jié),因為取消任務(wù)
private static final int CANCELLED = 4;
// FutureTask 任務(wù)完結(jié),也是取消任務(wù),不過發(fā)起了中斷運行任務(wù)線程的中斷請求
private static final int INTERRUPTING = 5;
// FutureTask 任務(wù)完結(jié),也是取消任務(wù),已經(jīng)完成了中斷運行任務(wù)線程的中斷請求
private static final int INTERRUPTED = 6;
?
5.2 其他變量
/** 任務(wù) */
private Callable<V> callable;
/** 儲存結(jié)果*/
private Object outcome; // non-volatile, protected by state reads/writes
/** 執(zhí)行任務(wù)的線程*/
private volatile Thread runner;
/** get方法阻塞的線程隊列 */
private volatile WaitNode waiters;
//FutureTask的內(nèi)部類,get方法的等待隊列
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
?
5.3 CAS工具初始化
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
這段代碼是為了后面使用CAS而準備的。可以這么理解:
一個java對象可以看成是一段內(nèi)存,各個字段都得按照一定的順序放在這段內(nèi)存里,同時考慮到對齊要求,可能這些字段不是連續(xù)放置的,用這個UNSAFE.objectFieldOffset()方法能準確地告訴你某個字段相對于對象的起始內(nèi)存地址的字節(jié)偏移量,因為是相對偏移量,所以它其實跟某個具體對象又沒什么太大關(guān)系,跟class的定義和虛擬機的內(nèi)存模型的實現(xiàn)細節(jié)更相關(guān)。
5.4 構(gòu)造函數(shù)
FutureTask有兩個構(gòu)造函數(shù):
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
這兩個構(gòu)造函數(shù)區(qū)別在于,如果使用第一個構(gòu)造函數(shù)最后獲取線程實行結(jié)果就是callable的執(zhí)行的返回結(jié)果;而如果使用第二個構(gòu)造函數(shù)那么最后獲取線程實行結(jié)果就是參數(shù)中的result,接下來讓我們看一下FutureTask的run方法。
同時兩個構(gòu)造函數(shù)都把當前狀態(tài)設(shè)置為NEW。
5.5 jdk1.8和之前版本的區(qū)別
run方法:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable; // 這里的callable是從構(gòu)造方法里面?zhèn)魅说? if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 保存call方法拋出的異常
}
if (ran)
set(result); // 保存call方法的執(zhí)行結(jié)果
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
1.8:
get方法的邏輯很簡單,如果call方法的執(zhí)行過程已完成,就把結(jié)果給出去;如果未完成,就將當前線程掛起等待。awaitDone方法里面死循環(huán)的邏輯,推演幾遍就能弄懂;它里面掛起線程的主要創(chuàng)新是定義了WaitNode類,來將多個等待線程組織成隊列,這是與JDK6的實現(xiàn)最大的不同。
1.6:
JDK6的FutureTask的基本操作都是通過自己的內(nèi)部類Sync來實現(xiàn)的,而Sync繼承自AbstractQueuedSynchronizer這個出鏡率極高的并發(fā)工具類
也就是說1.8自己定義了一個WaitNode類,來將多個等待線程組織成隊列,在以前使用AQS來實現(xiàn)的
這個get方法里面處理等待線程隊列的方式是調(diào)用了acquireSharedInterruptibly方法,看過我之前幾篇博客文章的讀者應(yīng)該非常熟悉了。其中的等待線程隊列、線程掛起和喚醒等邏輯,這里不再贅述,如果不明白,請出門左轉(zhuǎn)。
六、Callable 和 Future 有什么關(guān)系?
我們可以通過?FutureTask?來理解?Callable?和?Future?之間的關(guān)系。
FutureTask?提供了?Future?接口的基本實現(xiàn),常用來封裝?Callable?和?Runnable,具有取消任務(wù)、查看任務(wù)是否執(zhí)行完成以及獲取任務(wù)執(zhí)行結(jié)果的方法。ExecutorService.submit()?方法返回的其實就是?Future?的實現(xiàn)類?FutureTask?。
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
FutureTask?不光實現(xiàn)了?Future接口,還實現(xiàn)了Runnable?接口,因此可以作為任務(wù)直接被線程執(zhí)行。
FutureTask?有兩個構(gòu)造函數(shù),可傳入?Callable?或者?Runnable?對象。實際上,傳入?Runnable?對象也會在方法內(nèi)部轉(zhuǎn)換為Callable?對象。
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
// 通過適配器RunnableAdapter來將Runnable對象runnable轉(zhuǎn)換成Callable對象
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
FutureTask相當于對Callable?進行了封裝,管理著任務(wù)執(zhí)行的情況,存儲了?Callable?的?call?方法的任務(wù)執(zhí)行結(jié)果。
七、CompletableFuture 類
Future 在實際使用過程中存在一些局限性,比如不支持異步任務(wù)的編排組合、獲取計算結(jié)果的 get() 方法為阻塞調(diào)用。
Java 8 才被引入CompletableFuture 類可以解決Future 的這些缺陷。CompletableFuture 除了提供了更為好用和強大的 Future 特性之外,還提供了函數(shù)式編程、異步任務(wù)編排組合(可以將多個異步任務(wù)串聯(lián)起來,組成一個完整的鏈式調(diào)用)等能力。
下面我們來簡單看看 CompletableFuture 類的定義。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}
可以看到,CompletableFuture?同時實現(xiàn)了?Future?和?CompletionStage?接口。
CompletionStage 接口描述了一個異步計算的階段。很多計算可以分成多個階段或步驟,此時可以通過它將所有步驟組合起來,形成異步計算的流水線。
CompletionStage 接口中的方法比較多,CompletableFuture 的函數(shù)式能力就是這個接口賦予的。從這個接口的方法參數(shù)你就可以發(fā)現(xiàn)其大量使用了 Java8 引入的函數(shù)式編程。文章來源:http://www.zghlxwxcb.cn/news/detail-770348.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-770348.html
到了這里,關(guān)于【并發(fā)編程】Java的Future機制詳解(Future接口和FutureTask類)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!