使用Parallel Stream時,在適當?shù)沫h(huán)境中,通過適當?shù)厥褂貌⑿卸燃墑e,可以在某些情況下獲得性能提升。
如果程序創(chuàng)建一個自定義ThreadPool,必須記住調(diào)用它的shutdown()方法來避免內(nèi)存泄漏。
Parallel Stream默認使用的線程池
如下代碼示例,Parallel Stream并行處理使用的線程池是ForkJoinPool.commonPool(),這個線程池是由整個應用程序共享的線程池。
@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
List<Long> aList = new ArrayList<>();
Stream<Long> parallelStream = aList.parallelStream();
assertTrue(parallelStream.isParallel());
}
如何自定義線程池
簡單示例
如下代碼示例說明如下:
- 使用的ForkJoinPool構造函數(shù)的并行級別為4。為了確定不同環(huán)境下的最佳值,需要進行一些實驗,但一個好的經(jīng)驗法則是根據(jù)CPU的核數(shù)選擇數(shù)值。
- 接下來,處理并行流的內(nèi)容,在reduce調(diào)用中對它們進行匯總。
這個簡單的示例可能不能充分說明使用自定義線程池的用處,但是在不希望將公共線程池與長時間運行的任務綁定在一起(例如處理來自網(wǎng)絡源的數(shù)據(jù))或應用程序中的其他組件正在使用公共線程池的情況下,其好處就很明顯了。
@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {
long firstNum = 1;
long lastNum = 1_000_000;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}
復雜點的示例
通過查看日志,可以看到使用了自定義的線程池,提高了并發(fā)處理的效率
@Test
public void testCustomThreadPool() throws ExecutionException, InterruptedException {
List<Long> firstRange = LongStream.rangeClosed(1, 10).boxed()
.collect(Collectors.toList());
List<Long> secondRange = LongStream.rangeClosed(5000, 6000).boxed()
.collect(Collectors.toList());
ForkJoinPool forkJoinPool = new ForkJoinPool(3);
Future<Long> future = forkJoinPool.submit(() -> {
return firstRange.parallelStream().map((number) -> {
try {
print(Thread.currentThread().getName() +" 正在處理 "+number);
Thread.sleep(5);
} catch (InterruptedException e) {
}finally {
return number;
}
}).reduce(0L, Long::sum);
});
assertEquals((1 + 10) * 10 / 2, future.get());
forkJoinPool.shutdown();
ForkJoinPool forkJoinPool2 = new ForkJoinPool(10);
forkJoinPool2.submit(() -> {
secondRange.parallelStream().forEach((number) -> {
try {
print(Thread.currentThread().getName() +" 正在處理 "+number);
Thread.sleep(1);
} catch (InterruptedException e) {
}
});
});
forkJoinPool2.shutdown();
TimeUnit.SECONDS.sleep(2);
}
private static void print(String msg){
System.out.println(msg);
}
求質(zhì)數(shù)
實現(xiàn)方案有如下2種:
- 將Parallel task 直接提交給自定義的ForkJoinPool中
- 將自定義線程池傳遞到完整的future.supplyAsync方法中
public class StreamTest {
@Test
public void testCompletableFuture()throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Long>> primes = CompletableFuture.supplyAsync(() ->
//parallel task here, for example
range(1, 1_000_000).parallel().filter(StreamTest::isPrime).boxed().collect(toList()),
forkJoinPool
);
forkJoinPool.shutdown();
System.out.println(primes.get());
}
@Test
public void testCustomForkJoinPool() throws InterruptedException {
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(StreamTest::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
private static void print(String msg){
System.out.println(msg);
}
}
注意事項:小心內(nèi)存泄漏【Memory Leak】
正如前面所討論的,整個應用程序默認使用公共線程池。公共線程池是一個靜態(tài)ThreadPool實例。
因此,如果使用默認線程池,就不會發(fā)生內(nèi)存泄漏。
但是針對使用自定義線程池的場景下,customThreadPool對象不會被解引用和垃圾收集——相反,它將等待分配新任務【the customThreadPool object won’t be dereferenced and garbage collected — instead, it will be waiting for new tasks to be assigned】
也就是說,每次調(diào)用測試方法時,都會創(chuàng)建一個新的customThreadPool對象,并且它不會被釋放。文章來源:http://www.zghlxwxcb.cn/news/detail-488929.html
解決這個問題很簡單:在執(zhí)行了這個方法之后關閉customThreadPool對象:文章來源地址http://www.zghlxwxcb.cn/news/detail-488929.html
@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {
long firstNum = 1;
long lastNum = 1_000_000;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
try {
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}finally {
customThreadPool.shutdown();
}
}
到了這里,關于JAVA-- 在Java8 Parallel Stream中如何自定義線程池?的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!