前言
CompletableFuture是Java 8中引入的一個(gè)功能強(qiáng)大的Future實(shí)現(xiàn)類,它的字面翻譯是“可完成的Future”。 CompletableFuture對(duì)并發(fā)編程進(jìn)行了增強(qiáng),可以方便地將多個(gè)有一定依賴關(guān)系的異步任務(wù)以流水線的方式組合在一起,大大簡化多異步任務(wù)的開發(fā)。
CompletableFuture實(shí)現(xiàn)了兩個(gè)接口,一個(gè)是Future,另一個(gè)是CompletionStage,F(xiàn)uture表示異步任務(wù)的結(jié)果,而CompletionStage字面意思是完成階段,多個(gè)CompletionStage可以以流水線的方式組合起來,對(duì)于其中一個(gè)CompletionStage,它有一個(gè)計(jì)算任務(wù),但可能需要等待其他一個(gè)或多個(gè)階段完成才能開始,它完成后,可能會(huì)觸發(fā)其他階段開始運(yùn)行。
CompletableFuture的設(shè)計(jì)主要是為了解決Future的阻塞問題,并提供了豐富的API來支持函數(shù)式編程和流式編程,可以更方便地組合多個(gè)異步任務(wù),并處理它們的依賴關(guān)系和異常。這使得它在處理并發(fā)編程和異步編程時(shí)非常有用。
在使用CompletableFuture時(shí),可以創(chuàng)建它的實(shí)例,并通過其提供的各種方法(如thenApply、thenCompose、thenAccept等)來定義任務(wù)之間的依賴關(guān)系和執(zhí)行順序。同時(shí),CompletableFuture還提供了異常處理機(jī)制,可以更方便地處理任務(wù)執(zhí)行過程中可能出現(xiàn)的異常。
一、CompletableFuture基本用法
- 靜態(tài)方法supplyAsync
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
方法接受兩個(gè)參數(shù)supplier和executor,使用executor執(zhí)行supplier表示的任務(wù),返回一個(gè)CompletableFuture,調(diào)用后,任務(wù)被異步執(zhí)行,這個(gè)方法立即返回。
supplyAsync還有一個(gè)不帶executor參數(shù)的方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
沒有executor,任務(wù)被誰執(zhí)行呢?與系統(tǒng)環(huán)境和配置有關(guān),一般來說,如果可用的CPU核數(shù)大于2,會(huì)使用Java 7引入的Fork/Join任務(wù)執(zhí)行服務(wù),即ForkJoinPool.commonPool(),該任務(wù)執(zhí)行服務(wù)背后的工作線程數(shù)一般為CPU核數(shù)減1,即Runtime.getRuntime().availableProcessors()-1,否則,會(huì)使用ThreadPerTaskExecutor,它會(huì)為每個(gè)任務(wù)創(chuàng)建一個(gè)線程。
對(duì)于CPU密集型的運(yùn)算任務(wù),使用Fork/Join任務(wù)執(zhí)行服務(wù)是合適的,但對(duì)于一般的調(diào)用外部服務(wù)的異步任務(wù),F(xiàn)ork/Join可能是不合適的,因?yàn)樗牟⑿卸缺容^低,可能會(huì)讓本可以并發(fā)的多任務(wù)串行運(yùn)行,這時(shí),應(yīng)該提供Executor參數(shù)。文章來源:http://www.zghlxwxcb.cn/news/detail-845619.html
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Supplier;
public class CompletableFutureDemo {
private static ExecutorService executor = Executors.newFixedThreadPool(10);
private static Random rnd = new Random();
static int delayRandom(int min, int max) {
int milli = max > min ? rnd.nextInt(max - min) : 0;
try {
Thread.sleep(min + milli);
} catch (InterruptedException e) {
}
return milli;
}
static Callable<Integer> externalTask = () -> {
int time = delayRandom(20, 2000);
return time;
};
public static void master() {
Future<Integer> asyncRet = callExternalService();
try {
Integer ret = asyncRet.get();
System.out.println(ret);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public static CompletableFuture<Integer> callExternalService(){
Supplier<Integer> supplierTask = () -> {
int time = delayRandom(20, 2000);
return time;
};
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(supplierTask);
return future;
}
public static void main(String[] args) throws Exception{
// master();
CompletableFuture<Integer> integerFuture = callExternalService();
boolean done = integerFuture.isDone();
Integer now = integerFuture.getNow(0);
System.out.println(now);
Integer result = integerFuture.get();
System.out.println(result);
now = integerFuture.getNow(0);
System.out.println(now);
// executor.shutdown();
}
}
二、使用CompletableFuture來調(diào)度執(zhí)行由JSON串定義的DAG
在這個(gè)例子中,我們創(chuàng)建了四個(gè)任務(wù):A、B、C 和 D。任務(wù)B依賴于任務(wù)A的結(jié)果,而任務(wù)D依賴于任務(wù)B和任務(wù)C的結(jié)果。我們使用thenApplyAsync來創(chuàng)建依賴鏈,并使用CompletableFuture.allOf來等待多個(gè)任務(wù)的完成。最后,我們使用get方法來獲取結(jié)果。文章來源地址http://www.zghlxwxcb.cn/news/detail-845619.html
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
public class DagScheduler {
public static void main(String[] args) throws ExecutionException, InterruptedException {
String dagJson = "{\"nodes\": [{\"id\": \"A\", \"task\": \"printA\"}, {\"id\": \"B\", \"task\": \"printB\", \"dependencies\": [\"A\"]}, {\"id\": \"C\", \"task\": \"printC\", \"dependencies\": [\"A\"]}, {\"id\": \"D\", \"task\": \"printD\", \"dependencies\": [\"B\", \"C\"]}]}";
Map<String, CompletableFuture<Void>> futures = new HashMap<>();
// 解析JSON串
JSONObject dagObject = JSONObject.parseObject(dagJson);
JSONArray nodesArray = dagObject.getJSONArray("nodes");
// 創(chuàng)建一個(gè)映射,用于存儲(chǔ)每個(gè)節(jié)點(diǎn)的CompletableFuture
Map<String, Node> nodeMap = new HashMap<>();
for (int i = 0; i < nodesArray.size(); i++) {
JSONObject nodeObj = nodesArray.getJSONObject(i);
String id = nodeObj.getString("id");
List<String> dependencies = new ArrayList<>();
if (nodeObj.containsKey("dependencies")) {
dependencies = nodeObj.getJSONArray("dependencies").toJavaList(String.class);
}
Node node = new Node(id, () -> executeTask(id), dependencies);
nodeMap.put(id, node);
}
// 構(gòu)建依賴關(guān)系并啟動(dòng)任務(wù)
for (Node node : nodeMap.values()) {
node.start(futures, nodeMap);
}
// 等待所有任務(wù)完成
CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])).get();
System.out.println("All tasks completed.");
}
private static Void executeTask(String taskId) {
// 執(zhí)行任務(wù)的具體邏輯
System.out.println("Executing task: " + taskId);
// 模擬任務(wù)執(zhí)行時(shí)間
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
return null;
}
static class Node {
private final String id;
private final Supplier<Void> task;
private final List<String> dependencies;
private CompletableFuture<Void> future;
public Node(String id, Supplier<Void> task, List<String> dependencies) {
this.id = id;
this.task = task;
this.dependencies = dependencies;
}
public void start(Map<String, CompletableFuture<Void>> futures, Map<String, Node> nodeMap) {
List<CompletableFuture<Void>> depFutures = new ArrayList<>();
for (String depId : dependencies) {
CompletableFuture<Void> depFuture = futures.get(depId);
if (depFuture == null) {
throw new IllegalStateException("Unknown dependency: " + depId);
}
depFutures.add(depFuture);
}
if (depFutures.isEmpty()) {
// 沒有依賴,直接執(zhí)行任務(wù)
future = CompletableFuture.supplyAsync( task);
} else {
// 等待所有依賴完成后執(zhí)行任務(wù)
future = CompletableFuture.allOf(depFutures.toArray(new CompletableFuture[0])).thenRunAsync(()->executeTask(id));
}
futures.put(id, future);
}
}
}
到了這里,關(guān)于Java組合式異步編程CompletableFuture的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!