引言
上篇介紹了 ThreadPoolExecutor
配置和擴展相關的信息,本篇開始將介紹遞歸算法的并行化。
還記得我們在《Java并發(fā)編程學習11-任務執(zhí)行演示》中,對頁面繪制程序進行一系列改進,這些改進大大地提供了頁面繪制的并行性。
我們簡單回顧下相關的改進過程:
- 第一次新增時,頁面繪制程序完全是串行執(zhí)行;
- 第二次改進時,雖然用了兩個線程,并行地執(zhí)行了兩個不同類型的任務【下載圖像和渲染文本】,但它們?nèi)匀皇谴械叵螺d所有圖像。
- 最后一次改進,將每個圖像的下載操作視為一個獨立任務,實現(xiàn)了更高的并發(fā)性。
從上面的改進過程中,我們可以看出:
如果循環(huán)中的迭代操作都是獨立的,并且不需要等待所有的迭代操作都完成再繼續(xù)執(zhí)行,那么就可以使用
Executor
將串行循環(huán)轉(zhuǎn)換為并行循環(huán)。
1. 串行循環(huán)轉(zhuǎn)并行循環(huán)
下面我們來看一下如下的示例【將串行執(zhí)行轉(zhuǎn)換為并行執(zhí)行】:
public class Process {
/**
* 串行循環(huán)
*
* @param elements 待處理的數(shù)據(jù)列表
*/
public static void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e);
}
/**
* 并行循環(huán)
*
* @param exec 線程池對象
* @param elements 待處理的數(shù)據(jù)列表
*/
public static void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements)
exec.execute(new Runnable() {
@Override
public void run() {
process(e);
}
});
}
private static void process(Element e) {
// 處理單個數(shù)據(jù)
}
}
在上述的示例中,processInParallel
方法會在所有下載任務都進入了 Executor
的隊列后就立即返回,而不會等待這些任務全部完成,因此調(diào)用 processInParallel
比調(diào)用 processSequentially
能更快地返回。
當串行循環(huán)中的各個迭代操作之間彼此獨立,并且每個迭代操作執(zhí)行的工作量比管理一個新任務時帶來的開銷更多,那么這個串行循環(huán)就適合并行化。
2. 串行遞歸轉(zhuǎn)并行遞歸
在遞歸的算法中通常都會存在串行循環(huán),這就可以用上面 1 中的方式進行并行化。
如果在每個迭代操作中,都不需要來自后續(xù)遞歸迭代的結(jié)果,那可以參考下面的 parallelRecursive
方法來對遞歸進行并行化改進:
public class Process {
/**
* 串行遞歸
*
* @param nodes 樹節(jié)點集合
* @param results 結(jié)果集合
* @param <T> 樹中元素的類型
*/
public static <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {
for (Node<T> n : nodes) {
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}
/**
* 并行遞歸
*
* @param exec 線程池對象
* @param nodes 樹節(jié)點集合
* @param results 結(jié)果集合
* @param <T> 樹中元素的類型
*/
public static <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
for (final Node<T> n : nodes) {
exec.execute(new Runnable() {
@Override
public void run() {
results.add(n.compute());
}
});
parallelRecursive(exec, n.getChildren(), results);
}
}
}
上述示例中,
-
串行遞歸
sequentialRecursive
方法,用深度優(yōu)先算法遍歷一棵樹,在每個節(jié)點上執(zhí)行計算并將結(jié)果放到一個集合里 -
并行遞歸
parallelRecursive
方法,同樣用深度優(yōu)先遍歷,但它并不是在訪問節(jié)點時進行計算,而是為每個節(jié)點提交一個任務來完成計算。
當 parallelRecursive
返回時,樹中的各個節(jié)點都已經(jīng)訪問過了,并且每個節(jié)點的計算任務也已經(jīng)放入 Executor 的工作隊列。
注意:
parallelRecursive
中遍歷樹的過程仍然是串行的,只有樹節(jié)點的計算操作才是并行執(zhí)行的。
既然上面樹節(jié)點計算已經(jīng)并行,那么 sequentialRecursive
方法的調(diào)用者該如何獲取所有的結(jié)果呢???
這就需要創(chuàng)建一個特定于遍歷過程的 Executor
,并使用 shutdown
和 awaitTermination
等方法。
下面我們來看一下如下的示例【等待通過并行方式計算的結(jié)果】:
/**
* 等待通過并行方式計算的結(jié)果
*
* @param nodes 樹節(jié)點集合
* @param <T> 樹中元素的類型
* @return 計算結(jié)果集合
*/
public static <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedDeque<>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
3. 謎題
我們已經(jīng)從上面初步了解了串行轉(zhuǎn)并行的一些內(nèi)容,其實這項技術(shù)的一個重要應用就是解決一些謎題,例如 “搬箱子”、“Hi-Q”、“四色方柱” 和 棋牌謎題等,這些謎題都需要找出一系列的操作從初始狀態(tài)轉(zhuǎn)換到目標狀態(tài)。
現(xiàn)在我們給出謎題的定義,包含如下:
- 一個初始位置
- 一個目標位置
- 一個用于判斷是否有效移動的規(guī)則集。它包含兩部分:
- 計算從指定位置開始的所有合法移動
- 每次移動的結(jié)果位置
下面我們來看一下如下的示例【它表示 “搬箱子” 之類謎題的接口類】:
public interface Puzzle<P, M> {
P initialPosition();
boolean isGoal(P position);
Set<M> legalMoves(P position);
P move(P position, M move);
}
上述 Puzzle
表示謎題的接口類,其中的類型參數(shù) P
和 M
表示位置類和移動類。
有了謎題的定義,我們再來看看謎題位置的定義【它用于謎題解決框架的鏈表節(jié)點】:
@Immutable
public class Node<P, M> {
final P pos;
final M move;
final Node<P, M> prev;
public Node(P pos, M move, Node<P, M> prev) {
this.pos = pos;
this.move = move;
this.prev = prev;
}
public List<M> asMoveList() {
List<M> solution = new LinkedList<>();
for (Node<P, M> n = this; n.move != null; n = n.prev)
solution.add(0, n.move);
return solution;
}
}
上述示例中,Node
代表通過一系列的移動到達的一個位置,其中保存了到達該位置的移動以及前一個 Node
。只要沿著 Node
鏈接逐步回溯,就可以重新構(gòu)建出到達當前位置的移動序列。
3.1 串行的謎題解答器
有了 Puzzle
和 Node
,我們現(xiàn)在可以寫一個簡單的謎題框架的串行求解程序,該程序?qū)⒃谥i題空間中執(zhí)行一個深度優(yōu)先搜索,直到找到一個解答(當然這不一定是最短的解決方案)或者找遍了整個空間都沒有發(fā)現(xiàn)答案。
下面我們來看一下如下的示例【串行的謎題解答器】:
public class SequentialPuzzleSolver<P, M> {
private final Puzzle<P, M> puzzle;
private final Set<P> seen = new HashSet<>();
public SequentialPuzzleSolver(Puzzle<P, M> puzzle) {
this.puzzle = puzzle;
}
public List<M> solve() {
P pos = puzzle.initialPosition();
return search(new Node<P, M>(pos, null, null));
}
private List<M> search(Node<P, M> node) {
if (!seen.contains(node.pos)) {
seen.add(node.pos);
if (puzzle.isGoal(node.pos))
return node.asMoveList();
for (M move : puzzle.legalMoves(node.pos)) {
P pos = puzzle.move(node.pos, move);
Node<P, M> child = new Node<P, M>(pos, move, node);
List<M> result = search(child);
if (result != null)
return result;
}
}
return null;
}
}
3.2 并發(fā)的謎題解答器
上面 3.1 中我們已經(jīng)介紹了串行的謎題解答器 SequentialPuzzleSolver
,那么下面我們來分析看看它哪里有可以利用的并發(fā)改進?
簡單分析下,計算某次移動的過程在很大程度上與計算其他移動的過程是相互獨立的,因此我們可以以并行方式來計算下一步移動以及目標條件。
當然這里說“很大程度上”,是因為在各個任務之間會共享一些可變狀態(tài),例如已遍歷位置的集合。
下面我們來看一下如下的示例【并發(fā)的謎題解答器】:
public class ConcurrentPuzzleSolver<P, M> {
private final Puzzle<P, M> puzzle;
private final ExecutorService exec;
private final ConcurrentMap<P, Boolean> seen;
final ValueLatch<Node<P, M>> solution = new ValueLatch<>();
public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle, ExecutorService exec, ConcurrentMap<P, Boolean> seen) {
this.puzzle = puzzle;
this.exec = exec;
this.seen = seen;
}
public List<M> solve() throws InterruptedException {
try {
P p = puzzle.initialPosition();
exec.execute(newTask(p, null, null));
// 阻塞直到找到解答
Node<P, M> solnNode = solution.getValue();
return (solnNode == null) ? null : solnNode.asMoveList();
} finally {
exec.shutdown();
}
}
protected Runnable newTask(P p, M m, Node<P, M> n) {
return new SolverTask(p, m, n);
}
private class SolverTask extends Node<P, M> implements Runnable {
SolverTask(P p, M m, Node<P, M> n) {
super(p, m, n);
}
@Override
public void run() {
if (solution.isSet() || seen.putIfAbsent(pos, true) != null)
return; // 已經(jīng)找到了解答 或者 已經(jīng)遍歷了這個位置
if (puzzle.isGoal(pos))
solution.setValue(this);
else
for (M m : puzzle.legalMoves(pos))
exec.execute(newTask(puzzle.move(pos, m), m, this));
}
}
}
在上面的并發(fā)的謎題解答器 ConcurrentPuzzleSolver
中,我們使用了一個內(nèi)部類 SolverTask
,該類擴展了 Node
并實現(xiàn)了 Runnable
接口,其中它的 run
方法實現(xiàn)了如下的功能:
- 首先計算出下一步可能到達的所有位置,并去掉已經(jīng)達到的位置;
- 然后判斷(這個任務或者其他某個任務)是否已經(jīng)成功地完成;
- 最后將尚未搜索過的位置提交給
Executor
。
還記得上面我們在串行版本中引入了一個 Set
對象,它的作用就是為了避免無限循環(huán),其中保存了之前已經(jīng)搜索過的所有位置信息。
同樣在 ConcurrentPuzzleSolver
中,我們使用 ConCurrentHashMap
也實現(xiàn)了相同的功能。這種做法不僅提供了線程安全性,還避免了在更新共享集合時存在的競態(tài)條件,因為 putIfAbsent
只有在之前沒有遍歷過的某個位置才會通過原子方式添加到集合中。
上述串行版本的謎題解答器,執(zhí)行深度優(yōu)先搜索,因此搜索過程將受限于棧的大小。而并發(fā)版本的謎題解答器執(zhí)行廣度優(yōu)先搜索,因此不會受到棧大小的限制(但如果待搜索的或者已搜索的位置集合大小超過了可用的內(nèi)存總量,那么仍可能耗盡內(nèi)存)。
下面我們來思考一下,并發(fā)場景下,我們?nèi)绾尾拍茉谡业侥硞€解答后停止搜索呢???
很顯然,這個時候就需要通過某種方式來檢查是否有線程已經(jīng)找到了一個解答。
細心的讀者可能已經(jīng)發(fā)現(xiàn)了,在 ConcurrentPuzzleSolver
中我們定義了 ValueLatch
,它是使用 CountDownLatch
來實現(xiàn)所需的閉鎖行為,并且使用鎖定機制來確保解答只會被設置一次。
@ThreadSafe
public class ValueLatch<T> {
@GuardedBy("this")
private T value = null;
private final CountDownLatch done = new CountDownLatch(1);
public boolean isSet() {
return (done.getCount() == 0);
}
public synchronized void setValue(T newValue) {
if (!isSet()) {
value = newValue;
done.countDown();
}
}
public T getValue() throws InterruptedException {
done.await();
synchronized (this) {
return value;
}
}
}
每個任務首先查詢 solution
閉鎖,找到一個解答就停止。而在此之前,主線程需要等待,ValueLatch
中的 getValue
將一直阻塞,直到有線程設置了這個值。
ValueLatch
提供了一種方式來保存這個值,只有第一次調(diào)用才會設置它。調(diào)用者能夠判斷這個值是否已經(jīng)被設置,以及阻塞并等候它被設置。在第一次調(diào)用 setValue
時,將更新解答方案,并且 CountDownLatch
會遞減,從 getValue
中釋放主線程。
第一個找到解答的線程還會關閉 Executor
,從而阻止接受新的任務。如果要避免處理 RejectedExecutionException
,需要將拒絕執(zhí)行處理器設置為 “拋棄已提交的任務”。然后,所有未完成的任務最終將執(zhí)行完成,并且在執(zhí)行任何新任務時都會失敗,從而使 Executor
結(jié)束。
3.3 無解答的并發(fā)解答器
講到這里都是說的有某個解答的情況,如果謎題本身就不存在解答的話,那 ConcurrentPuzzleSolver
就無法很好地處理這種情況了:如果已經(jīng)遍歷了所有的移動位置都沒有找到解答,那么在 getValue 調(diào)用中將永遠等待下去。
那么并發(fā)場景下,如果沒有解答,有沒有什么方法可以結(jié)束程序呢?
有一種方法就是記錄活動任務的數(shù)量,當該值為零時將解答設置為 null
。
下面我們來看一下如下的示例【在解答器中找不到解答的場景】:
public class PuzzleSolver<P, M> extends ConcurrentPuzzleSolver<P, M> {
private final AtomicInteger taskCount = new AtomicInteger(0);
public PuzzleSolver(Puzzle<P, M> puzzle, ExecutorService exec, ConcurrentMap<P, Boolean> seen) {
super(puzzle, exec, seen);
}
protected Runnable newTask(P p, M m, Node<P, M> n) {
return new CountingSolverTask(p, m, n);
}
class CountingSolverTask extends SolverTask {
CountingSolverTask(P pos, M move, Node<P, M> prev) {
super(pos, move, prev);
taskCount.incrementAndGet();
}
@Override
public void run() {
try {
super.run();
} finally {
if (taskCount.decrementAndGet() == 0)
solution.setValue(null);
}
}
}
}
3.4 進一步的改進
我們知道,真實解題時,找到解答的時間可能比等待的時間要長,因此在解答器中還需要包含下面的結(jié)束條件:文章來源:http://www.zghlxwxcb.cn/news/detail-527288.html
- 時間限制。這種可以在
ValueLatch
中實現(xiàn)一個限時的getValue
(其中將使用限時版本的await
),如果getValue
超時,那么關閉Executor
并聲明出現(xiàn)一個失敗。 - 達到最大搜索深度或步數(shù)。為了避免無限循環(huán)或無限搜索的情況,可以設置一個最大搜索深度或步數(shù)作為結(jié)束條件。當解答器達到了這個限制時,搜索將終止
- 用戶中斷。在一些交互式的謎題解答環(huán)境中,用戶可以隨時中斷解答器的執(zhí)行,提前結(jié)束搜索過程
總結(jié)
對于可以并發(fā)執(zhí)行的任務,Executor
框架提供了大量可調(diào)節(jié)的選項,例如創(chuàng)建線程和關閉線程的策略,處理隊列任務的策略,處理過多任務的策略,并且提供了幾個鉤子方法來擴展它的行為。通過使用這些可調(diào)節(jié)的選項,我們可以根據(jù)具體需求來配置和擴展 Executor
框架的行為,以滿足不同的并發(fā)處理需求。文章來源地址http://www.zghlxwxcb.cn/news/detail-527288.html
到了這里,關于Java并發(fā)編程學習18-線程池的使用(下)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!