国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

JAVA-- 在Java8 Parallel Stream中如何自定義線程池?

這篇具有很好參考價值的文章主要介紹了JAVA-- 在Java8 Parallel Stream中如何自定義線程池?。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

使用Parallel Stream時,在適當?shù)沫h(huán)境中,通過適當?shù)厥褂貌⑿卸燃墑e,可以在某些情況下獲得性能提升。
如果程序創(chuàng)建一個自定義ThreadPool,必須記住調(diào)用它的shutdown()方法來避免內(nèi)存泄漏。

Parallel Stream默認使用的線程池

如下代碼示例,Parallel Stream并行處理使用的線程池是ForkJoinPool.commonPool(),這個線程池是由整個應用程序共享的線程池。

@Test
    public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
        List<Long> aList = new ArrayList<>();
        Stream<Long> parallelStream = aList.parallelStream();

        assertTrue(parallelStream.isParallel());
    }

如何自定義線程池

簡單示例

如下代碼示例說明如下:

  • 使用的ForkJoinPool構造函數(shù)的并行級別為4。為了確定不同環(huán)境下的最佳值,需要進行一些實驗,但一個好的經(jīng)驗法則是根據(jù)CPU的核數(shù)選擇數(shù)值。
  • 接下來,處理并行流的內(nèi)容,在reduce調(diào)用中對它們進行匯總。

這個簡單的示例可能不能充分說明使用自定義線程池的用處,但是在不希望將公共線程池與長時間運行的任務綁定在一起(例如處理來自網(wǎng)絡源的數(shù)據(jù))或應用程序中的其他組件正在使用公共線程池的情況下,其好處就很明顯了。

    @Test
    public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
        throws InterruptedException, ExecutionException {

        long firstNum = 1;
        long lastNum = 1_000_000;

        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
            .collect(Collectors.toList());

        ForkJoinPool customThreadPool = new ForkJoinPool(4);
        long actualTotal = customThreadPool.submit(
            () -> aList.parallelStream().reduce(0L, Long::sum)).get();

        assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
    }

復雜點的示例

通過查看日志,可以看到使用了自定義的線程池,提高了并發(fā)處理的效率

	@Test
    public void testCustomThreadPool() throws ExecutionException, InterruptedException {
        List<Long> firstRange = LongStream.rangeClosed(1, 10).boxed()
            .collect(Collectors.toList());

        List<Long> secondRange = LongStream.rangeClosed(5000, 6000).boxed()
            .collect(Collectors.toList());

        ForkJoinPool forkJoinPool = new ForkJoinPool(3);
        Future<Long> future = forkJoinPool.submit(() -> {
            return firstRange.parallelStream().map((number) -> {
                try {
                    print(Thread.currentThread().getName() +" 正在處理 "+number);
                    Thread.sleep(5);
                } catch (InterruptedException e) {
                }finally {
                    return number;
                }
            }).reduce(0L, Long::sum);
        });
        assertEquals((1 + 10) * 10 / 2, future.get());

        forkJoinPool.shutdown();

        ForkJoinPool forkJoinPool2 = new ForkJoinPool(10);

        forkJoinPool2.submit(() -> {
            secondRange.parallelStream().forEach((number) -> {
                try {
                    print(Thread.currentThread().getName() +" 正在處理 "+number);
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                }
            });
        });
        forkJoinPool2.shutdown();
        TimeUnit.SECONDS.sleep(2);
    }
    
 	private static void print(String msg){
        System.out.println(msg);
    }

求質(zhì)數(shù)

實現(xiàn)方案有如下2種:

  • 將Parallel task 直接提交給自定義的ForkJoinPool中
  • 將自定義線程池傳遞到完整的future.supplyAsync方法中
public class StreamTest {
    @Test
    public void testCompletableFuture()throws InterruptedException, ExecutionException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(2);
        CompletableFuture<List<Long>> primes = CompletableFuture.supplyAsync(() ->
                //parallel task here, for example
                range(1, 1_000_000).parallel().filter(StreamTest::isPrime).boxed().collect(toList()),
            forkJoinPool
        );
         forkJoinPool.shutdown();
        System.out.println(primes.get());
    }

    @Test
    public void testCustomForkJoinPool() throws InterruptedException {
        final int parallelism = 4;
        ForkJoinPool forkJoinPool = null;
        try {
            forkJoinPool = new ForkJoinPool(parallelism);
            final List<Integer> primes = forkJoinPool.submit(() ->
                // Parallel task here, for example
                IntStream.range(1, 1_000_000).parallel()
                    .filter(StreamTest::isPrime)
                    .boxed().collect(Collectors.toList())
            ).get();
            System.out.println(primes);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        } finally {
            if (forkJoinPool != null) {
                forkJoinPool.shutdown();
            }
        }
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }

    private static void print(String msg){
        System.out.println(msg);
    }
}

注意事項:小心內(nèi)存泄漏【Memory Leak】

正如前面所討論的,整個應用程序默認使用公共線程池。公共線程池是一個靜態(tài)ThreadPool實例。
因此,如果使用默認線程池,就不會發(fā)生內(nèi)存泄漏。

但是針對使用自定義線程池的場景下,customThreadPool對象不會被解引用和垃圾收集——相反,它將等待分配新任務【the customThreadPool object won’t be dereferenced and garbage collected — instead, it will be waiting for new tasks to be assigned】
也就是說,每次調(diào)用測試方法時,都會創(chuàng)建一個新的customThreadPool對象,并且它不會被釋放。

解決這個問題很簡單:在執(zhí)行了這個方法之后關閉customThreadPool對象:文章來源地址http://www.zghlxwxcb.cn/news/detail-488929.html

@Test
    public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
        throws InterruptedException, ExecutionException {

        long firstNum = 1;
        long lastNum = 1_000_000;

        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
            .collect(Collectors.toList());
        ForkJoinPool customThreadPool = new ForkJoinPool(4);
        try {
            long actualTotal = customThreadPool.submit(
                () -> aList.parallelStream().reduce(0L, Long::sum)).get();

            assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
        }finally {
            customThreadPool.shutdown();
        }
    }

到了這里,關于JAVA-- 在Java8 Parallel Stream中如何自定義線程池?的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Java Stream對象并行處理方法parallel()

    ????????Stream.parallel() 方法是 Java 8 中 Stream API 提供的一種并行處理方式。在處理大量數(shù)據(jù)或者耗時操作時,使用 Stream.parallel() 方法可以充分利用多核 CPU 的優(yōu)勢,提高程序的性能。本文將從以下幾個方面對 Stream.parallel() 進行詳解。 什么是 Stream.parallel() 方法 ????????

    2024年02月16日
    瀏覽(24)
  • 使用Java8 Stream流中的Collectors.collectingAndThen()方法去重

    Collectors.collectingAndThen() 根據(jù)對象屬性進行去重操作 Collectors.collectingAndThen()方法屬于java8 Stream流中的 java.util.stream.Collectors ,此類實現(xiàn)了 java.util.stream.Collector 接口,還提供了大量的方法對Stream流中的元素進行 map 和 reduce 操作 在獲取任務的時候,會出現(xiàn)id重復的狀況,利用 Co

    2024年02月09日
    瀏覽(103)
  • 【Java基礎】Java8 使用 stream().filter()過濾List對象(查找符合條件的對象集合)

    【Java基礎】Java8 使用 stream().filter()過濾List對象(查找符合條件的對象集合)

    本篇主要說明在Java8及以上版本中,使用stream().filter()來過濾List對象,查找符合條件的集合。 集合對象以學生類(Student)為例,有學生的基本信息,包括:姓名,性別,年齡,身高,生日幾項。 我的學生類代碼如下: 下面來添加一些測試用的數(shù)據(jù),代碼如下: 添加過濾條件

    2024年02月12日
    瀏覽(96)
  • java8新特性Stream流中anyMatch和allMatch和noneMatch的使用!??!

    判斷數(shù)據(jù)列表中是否存在任意一個元素符合設置的predicate條件,如果是就返回true,否則返回false。 接口定義: boolean anyMatch(Predicate? super T predicate); 方法描述: 在anyMatch 接口定義中是接收 Predicate 類型參數(shù),在Lamdba表達式中 PredicateT 是接收一個T類型參數(shù),然后經(jīng)過邏輯驗證返

    2024年02月08日
    瀏覽(23)
  • Java8使用stream流給List<Map<String,Object>>分組(多字段key)

    Java8使用 stream流 給ListMapString,Object根據(jù)字段key 分組 一、項目場景: 從已得到的List集合中,根據(jù)某一元素(這里指map的key)進行分組,篩選出需要的數(shù)據(jù)。 如果是SQL的話則使用 group by 直接實現(xiàn),代碼的方式則如下: 使用到stream流的 Collectors.groupingBy() 方法。 二、代碼實現(xiàn) 1、首

    2024年02月02日
    瀏覽(87)
  • 使用java8 新特性stream流對List<Map<String, Object>>集合進行遍歷、過濾、查詢、去重、排序、分組

    對于一個ListMapString, Object類型的數(shù)據(jù),可以使用Java 8的新特性stream流來進行遍歷、過濾、查詢、去重、排序、分組等操作。 遍歷: 過濾: 查詢: 去重: 排序: 分組:

    2024年02月10日
    瀏覽(106)
  • Java8 Stream流的合并

    最近的需求里有這樣一個場景,要校驗一個集合中每個對象的多個Id的有效性。比如一個Customer對象,有3個Id: id1 , id2 , id3 ,要把這些Id全部取出來,然后去數(shù)據(jù)庫里查詢它們是否存在。 通常情況下,我們都是從集合中取出對象的某一個字段,像這樣: 現(xiàn)在要取3個字段,

    2024年02月02日
    瀏覽(29)
  • Java8中Stream詳細用法大全

    Java8中Stream詳細用法大全

    Java 8 是一個非常成功的版本,這個版本新增的Stream,配合同版本出現(xiàn)的Lambda ,給我們操作集合(Collection)提供了極大的便利。Stream流是JDK8新增的成員,允許以聲明性方式處理數(shù)據(jù)集合,可以把Stream流看作是遍歷數(shù)據(jù)集合的一個高級迭代器。Stream 是 Java8 中處理集合的關鍵抽

    2023年04月08日
    瀏覽(31)
  • Java8 函數(shù)式編程stream流

    Java 8 中新增的特性旨在幫助程序員寫出更好的代碼,其中對核心類庫的改進是很關鍵的一部分,也是本章的主要內(nèi)容。對核心類庫的改進主要包括集合類的 API 和新引入的流(Stream),流使程序員得以站在更高的抽象層次上對集合進行操作。下面將介紹stream流的用法。 ?場景

    2024年02月15日
    瀏覽(21)
  • Java8的Stream流的學習

    Stream可以由數(shù)組或集合創(chuàng)建,對流的操作分為兩種: 中間操作,每次返回一個新的流,可以有多個。 終端操作,每個流只能進行一次終端操作,終端操作結束后流無法再次使用。終端操作會產(chǎn)生一個新的集合或值。 stream和parallelStream的簡單區(qū)分:?stream是順序流,由主線程按

    2024年02月07日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包