三、CompletableFutrue
一個商品詳情頁
- 展示SKU的基本信息 0.5s
- 展示SKU的圖片信息 0.6s
- 展示SKU的銷售信息 1s
- spu的銷售屬性 1s
- 展示規(guī)格參數(shù) 1.5s
- spu詳情信息 1s
1.ComplatableFuture介紹
??Future是Java 5添加的類,用來描述一個異步計算的結(jié)果。你可以使用 isDone
方法檢查計算是否完成,或者使用 get
阻塞住調(diào)用線程,直到計算完成返回結(jié)果,你也可以使用 cancel
方法停止任務(wù)的執(zhí)行。
??雖然 Future
以及相關(guān)使用方法提供了異步執(zhí)行任務(wù)的能力,但是對于結(jié)果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務(wù)的結(jié)果。阻塞的方式顯然和我們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,而且也不能及時地得到計算結(jié)果,為什么不能用觀察者設(shè)計模式當(dāng)計算結(jié)果完成及時通知監(jiān)聽者呢?
??很多語言,比如Node.js,采用回調(diào)的方式實現(xiàn)異步編程。Java的一些框架,比如Netty,自己擴展了Java的 Future
接口,提供了 addListener
等多個擴展方法;Google guava也提供了通用的擴展Future;Scala也提供了簡單易用且功能強大的Future/Promise異步編程模式。
??作為正統(tǒng)的Java類庫,是不是應(yīng)該做點什么,加強一下自身庫的功能呢?
??在Java 8中, 新增加了一個包含50個方法左右的類: CompletableFuture,提供了非常強大的Future的擴展功能,可以幫助我們簡化異步編程的復(fù)雜性,提供了函數(shù)式編程的能力,可以通過回調(diào)的方式處理計算結(jié)果,并且提供了轉(zhuǎn)換和組合CompletableFuture的方法。
??CompletableFuture類實現(xiàn)了Future接口,所以你還是可以像以前一樣通過 get
方法阻塞或者輪詢的方式獲得結(jié)果,但是這種方式不推薦使用。
??CompletableFuture和FutureTask同屬于Future接口的實現(xiàn)類,都可以獲取線程的執(zhí)行結(jié)果。
2.創(chuàng)建異步對象
CompletableFuture 提供了四個靜態(tài)方法來創(chuàng)建一個異步操作。
static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
方法分為兩類:
- runAsync 沒有返回結(jié)果
- supplyAsync 有返回結(jié)果
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5
,50
,10
, TimeUnit.SECONDS
,new LinkedBlockingQueue<>(100)
, Executors.defaultThreadFactory()
,new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main -- 線程開始了...");
// 獲取CompletableFuture對象
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
System.out.println("線程開始了...");
int i = 100/50;
System.out.println("線程結(jié)束了...");
},executor);
System.out.println("main -- 線程結(jié)束了...");
System.out.println("------------");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("線程開始了...");
int i = 100 / 50;
System.out.println("線程結(jié)束了...");
return i;
}, executor);
System.out.println("獲取的線程的返回結(jié)果是:" + future.get() );
}
3.whenXXX和handle方法
??當(dāng)CompletableFuture的計算結(jié)果完成,或者拋出異常的時候,可以執(zhí)行特定的Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) ;
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) ;
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) ;
相關(guān)方法的說明:
- whenComplete 可以獲取異步任務(wù)的返回值和拋出的異常信息,但是不能修改返回結(jié)果
- execptionlly 當(dāng)異步任務(wù)跑出了異常后會觸發(fā)的方法,如果沒有拋出異常該方法不會執(zhí)行
- handle 可以獲取異步任務(wù)的返回值和拋出的異常信息,而且可以顯示的修改返回的結(jié)果
/**
* CompletableFuture的介紹
*/
public class CompletableFutureDemo2 {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5
,50
,10
, TimeUnit.SECONDS
,new LinkedBlockingQueue<>(100)
, Executors.defaultThreadFactory()
,new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("線程開始了...");
int i = 100 / 5;
System.out.println("線程結(jié)束了...");
return i;
}, executor).handle((res,exec)->{
System.out.println("res = " + res + ":exec="+exec);
return res * 10;
});
// 可以處理異步任務(wù)之后的操作
System.out.println("獲取的線程的返回結(jié)果是:" + future.get() );
}
/* public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("線程開始了...");
int i = 100 / 5;
System.out.println("線程結(jié)束了...");
return i;
}, executor).whenCompleteAsync((res,exec)->{
System.out.println("res = " + res);
System.out.println("exec = " + exec);
}).exceptionally((res)->{ // 在異步任務(wù)顯示的拋出了異常后才會觸發(fā)的方法
System.out.println("res = " + res);
return 10;
});
// 可以處理異步任務(wù)之后的操作
System.out.println("獲取的線程的返回結(jié)果是:" + future.get() );
}*/
/* public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("線程開始了...");
int i = 100 / 0;
System.out.println("線程結(jié)束了...");
return i;
}, executor).whenCompleteAsync((res,exec)->{
System.out.println("res = " + res);
System.out.println("exec = " + exec);
});
// 可以處理異步任務(wù)之后的操作
System.out.println("獲取的線程的返回結(jié)果是:" + future.get() );
}*/
}
4.線程串行方法
thenApply 方法:當(dāng)一個線程依賴另一個線程時,獲取上一個任務(wù)返回的結(jié)果,并返回當(dāng)前任務(wù)的返回值。
thenAccept方法:消費處理結(jié)果。接收任務(wù)的處理結(jié)果,并消費處理,無返回結(jié)果。
thenRun方法:只要上面的任務(wù)執(zhí)行完成,就開始執(zhí)行thenRun,只是處理完任務(wù)后,執(zhí)行 thenRun的后續(xù)操作
帶有Async默認是異步執(zhí)行的。這里所謂的異步指的是不在當(dāng)前線程內(nèi)執(zhí)行。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
/**
* CompletableFuture的介紹
*/
public class CompletableFutureDemo3 {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5
,50
,10
, TimeUnit.SECONDS
,new LinkedBlockingQueue<>(100)
, Executors.defaultThreadFactory()
,new ThreadPoolExecutor.AbortPolicy()
);
/**
* 線程串行的方法
* thenRun:在前一個線程執(zhí)行完成后,開始執(zhí)行,不會獲取前一個線程的返回結(jié)果,也不會返回信息
* thenAccept:在前一個線程執(zhí)行完成后,開始執(zhí)行,獲取前一個線程的返回結(jié)果,不會返回信息
* thenApply: 在前一個線程執(zhí)行完成后。開始執(zhí)行,獲取前一個線程的返回結(jié)果,同時也會返回信息
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("線程開始了..." + Thread.currentThread().getName());
int i = 100 / 5;
System.out.println("線程結(jié)束了..." + Thread.currentThread().getName());
return i;
}, executor).thenApply(res -> {
System.out.println("res = " + res);
return res * 100;
});
// 可以處理異步任務(wù)之后的操作
System.out.println("獲取的線程的返回結(jié)果是:" + future.get() );
}
/*public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("線程開始了..." + Thread.currentThread().getName());
int i = 100 / 5;
System.out.println("線程結(jié)束了..." + Thread.currentThread().getName());
return i;
}, executor).thenAcceptAsync(res -> {
System.out.println(res + ":" + Thread.currentThread().getName());
}, executor);
// 可以處理異步任務(wù)之后的操作
//System.out.println("獲取的線程的返回結(jié)果是:" + future.get() );
}*/
/*public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("線程開始了..."+Thread.currentThread().getName());
int i = 100 / 5;
System.out.println("線程結(jié)束了..."+Thread.currentThread().getName());
return i;
}, executor).thenRunAsync(() -> {
System.out.println("線程開始了..."+Thread.currentThread().getName());
int i = 100 / 5;
System.out.println("線程結(jié)束了..."+Thread.currentThread().getName());
}, executor);
// 可以處理異步任務(wù)之后的操作
//System.out.println("獲取的線程的返回結(jié)果是:" + future.get() );
}*/
}
5.兩個都完成
??上面介紹的相關(guān)方法都是串行的執(zhí)行,接下來看看需要等待兩個任務(wù)執(zhí)行完成后才會觸發(fā)的幾個方法
- thenCombine :可以獲取前面兩線程的返回結(jié)果,本身也有返回結(jié)果
- thenAcceptBoth:可以獲取前面兩線程的返回結(jié)果,本身沒有返回結(jié)果
- runAfterBoth:不可以獲取前面兩線程的返回結(jié)果,本身也沒有返回結(jié)果
/**
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務(wù)1 線程開始了..." + Thread.currentThread().getName());
int i = 100 / 5;
System.out.println("任務(wù)1 線程結(jié)束了..." + Thread.currentThread().getName());
return i;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務(wù)2 線程開始了..." + Thread.currentThread().getName());
int i = 100 /10;
System.out.println("任務(wù)2 線程結(jié)束了..." + Thread.currentThread().getName());
return i;
}, executor);
// runAfterBothAsync 不能獲取前面兩個線程的返回結(jié)果,本身也沒有返回結(jié)果
CompletableFuture<Void> voidCompletableFuture = future1.runAfterBothAsync(future2, () -> {
System.out.println("任務(wù)3執(zhí)行了");
},executor);
// thenAcceptBothAsync 可以獲取前面兩個線程的返回結(jié)果,本身沒有返回結(jié)果
CompletableFuture<Void> voidCompletableFuture1 = future1.thenAcceptBothAsync(future2, (f1, f2) -> {
System.out.println("f1 = " + f1);
System.out.println("f2 = " + f2);
}, executor);
// thenCombineAsync: 既可以獲取前面兩個線程的返回結(jié)果,同時也會返回結(jié)果給阻塞的線程
CompletableFuture<String> stringCompletableFuture = future1.thenCombineAsync(future2, (f1, f2) -> {
return f1 + ":" + f2;
}, executor);
// 可以處理異步任務(wù)之后的操作
System.out.println("獲取的線程的返回結(jié)果是:" + stringCompletableFuture.get() );
}
6.兩個任務(wù)完成一個
??在上面5個基礎(chǔ)上我們來看看兩個任務(wù)只要有一個完成就會觸發(fā)任務(wù)3的情況
- runAfterEither:不能獲取完成的線程的返回結(jié)果,自身也沒有返回結(jié)果
- acceptEither:可以獲取線程的返回結(jié)果,自身沒有返回結(jié)果
- applyToEither:既可以獲取線程的返回結(jié)果,自身也有返回結(jié)果
/**
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務(wù)1 線程開始了..." + Thread.currentThread().getName());
int i = 100 / 5;
System.out.println("任務(wù)1 線程結(jié)束了..." + Thread.currentThread().getName());
return i;
}, executor);
CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務(wù)2 線程開始了..." + Thread.currentThread().getName());
int i = 100 /10;
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務(wù)2 線程結(jié)束了..." + Thread.currentThread().getName());
return i+"";
}, executor);
// runAfterEitherAsync 不能獲取前面完成的線程的返回結(jié)果,自身也沒有返回結(jié)果
future1.runAfterEitherAsync(future2,()->{
System.out.println("任務(wù)3執(zhí)行了....");
},executor);
// acceptEitherAsync 可以獲取前面完成的線程的返回結(jié)果 自身沒有返回結(jié)果
future1.acceptEitherAsync(future2,(res)->{
System.out.println("res = " + res);
},executor);
// applyToEitherAsync 既可以獲取完成任務(wù)的線程的返回結(jié)果 自身也有返回結(jié)果
CompletableFuture<String> stringCompletableFuture = future1.applyToEitherAsync(future2, (res) -> {
System.out.println("res = " + res);
return res + "-->OK";
}, executor);
// 可以處理異步任務(wù)之后的操作
System.out.println("獲取的線程的返回結(jié)果是:" + stringCompletableFuture.get() );
}
7.多任務(wù)組合
allOf:等待所有任務(wù)完成文章來源:http://www.zghlxwxcb.cn/news/detail-687118.html
anyOf:只要有一個任務(wù)完成文章來源地址http://www.zghlxwxcb.cn/news/detail-687118.html
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
/**
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務(wù)1 線程開始了..." + Thread.currentThread().getName());
int i = 100 / 5;
System.out.println("任務(wù)1 線程結(jié)束了..." + Thread.currentThread().getName());
return i;
}, executor);
CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務(wù)2 線程開始了..." + Thread.currentThread().getName());
int i = 100 /10;
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務(wù)2 線程結(jié)束了..." + Thread.currentThread().getName());
return i+"";
}, executor);
CompletableFuture<Object> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務(wù)3 線程開始了..." + Thread.currentThread().getName());
int i = 100 /10;
System.out.println("任務(wù)3 線程結(jié)束了..." + Thread.currentThread().getName());
return i+"";
}, executor);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
anyOf.get();
System.out.println("主任務(wù)執(zhí)行完成..." + anyOf.get());
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
allOf.get();// 阻塞在這個位置,等待所有的任務(wù)執(zhí)行完成
System.out.println("主任務(wù)執(zhí)行完成..." + future1.get() + " :" + future2.get() + " :" + future3.get());
}
到了這里,關(guān)于【業(yè)務(wù)功能篇92】微服務(wù)-springcloud-多線程-異步處理-異步編排-CompletableFutrue的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!