欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解如何自定義parallelStream線程池

 更新時間:2023年07月10日 08:36:03   作者:codecraft  
這篇文章主要為大家介紹了如何自定義parallelStream的線程池實現示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

ForkJoinPool

本文主要研究一下parallelStream怎么使用自定義的線程池

java/util/concurrent/ForkJoinPool.java

public class ForkJoinPool extends AbstractExecutorService {
    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }
    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }
    private static ForkJoinPool makeCommonPool() {
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
            if (fp != null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = new DefaultCommonPoolForkJoinWorkerThreadFactory();
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }
}

 parallelStream默認使用的是common的ForkJoinPool,可以通過系統(tǒng)屬性來設置parallelism等

ForkJoinPoolFactoryBean

org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java

public class ForkJoinPoolFactoryBean implements FactoryBean<ForkJoinPool>, InitializingBean, DisposableBean {
    private boolean commonPool = false;
    private int parallelism = Runtime.getRuntime().availableProcessors();
    private ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory;
    @Nullable
    private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
    private boolean asyncMode = false;
    private int awaitTerminationSeconds = 0;
    @Nullable
    private ForkJoinPool forkJoinPool;
    //......
    @Override
    public void destroy() {
        if (this.forkJoinPool != null) {
            // Ignored for the common pool.
            this.forkJoinPool.shutdown();
            // Wait for all tasks to terminate - works for the common pool as well.
            if (this.awaitTerminationSeconds > 0) {
                try {
                    this.forkJoinPool.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

 spring3.1提供了ForkJoinPoolFactoryBean,可以用于創(chuàng)建并托管forkJoinPool

示例

配置

@Configuration
public class ForkJoinConfig {
    @Bean
    public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() {
        ForkJoinPoolFactoryBean factoryBean = new ForkJoinPoolFactoryBean();
        factoryBean.setCommonPool(false);
        // NOTE LIFO_QUEUE FOR working steal from tail of queue
        factoryBean.setAsyncMode(true); // NOTE true FIFO_QUEUE, false LIFO_QUEUE
        factoryBean.setParallelism(10);
        // factoryBean.setUncaughtExceptionHandler();
        factoryBean.setAwaitTerminationSeconds(60);
        return factoryBean;
    }
}

使用

@Autowired
    ForkJoinPoolFactoryBean forkJoinPoolFactoryBean;
    public void streamParallel() throws ExecutionException, InterruptedException {
        List<TodoTask> result = forkJoinPoolFactoryBean.getObject().submit(new Callable<List<TodoTask>>() {
            @Override
            public List<TodoTask> call() throws Exception {
                return IntStream.rangeClosed(1, 20).parallel().mapToObj(i -> {
                    log.info("thread:{}", Thread.currentThread().getName());
                    return new TodoTask(i, "name"+i);
                }).collect(Collectors.toList());
            }
        }).get();
        result.stream().forEach(System.out::println);
    }

 common的workerName前綴為ForkJoinPool.commonPool-worker-
自定義的workerName前綴默認為ForkJoinPool- nextPoolId() -worker-

小結

parallelStream默認使用的是commonPool,是static代碼塊默認初始化,針對個別場景可以自定義ForkJoinPool,將parallelStream作為一個任務丟進去,這樣子就不會影響默認的commonPool。

以上就是如何自定義parallelStream的線程池的詳細內容,更多關于如何自定義parallelStream的線程池的資料請關注腳本之家其它相關文章!

相關文章

  • Java設計模式之java狀態(tài)模式詳解

    Java設計模式之java狀態(tài)模式詳解

    這篇文章主要介紹了Java設計模式之狀態(tài)模式定義與用法,結合具體實例形式詳細分析了Java狀態(tài)模式的概念、原理、定義及相關操作技巧,需要的朋友可以參考下
    2021-09-09
  • Java中的Semaphore信號量詳解

    Java中的Semaphore信號量詳解

    這篇文章主要介紹了Java中的Semaphore信號量詳解,Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,通過協(xié)調各個線程以保證合理地使用公共資源,需要的朋友可以參考下
    2023-12-12
  • Java 數據流之Broadcast State

    Java 數據流之Broadcast State

    這篇文章主要介紹了Java 數據流之Broadcast State,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-09-09
  • SpringBoot+MyBatis-Flex配置ProxySQL的實現步驟

    SpringBoot+MyBatis-Flex配置ProxySQL的實現步驟

    本文主要介紹了SpringBoot+MyBatis-Flex配置ProxySQL的實現步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2025-02-02
  • Java日常練習題,每天進步一點點(8)

    Java日常練習題,每天進步一點點(8)

    下面小編就為大家?guī)硪黄狫ava基礎的幾道練習題(分享)。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧,希望可以幫到你
    2021-07-07
  • SpringBoot整合JPA詳細代碼實例

    SpringBoot整合JPA詳細代碼實例

    這篇文章主要給大家介紹了關于SpringBoot整合JPA的相關資料,JPA(Java Persistence API)是Sun官方提出的Java持久化規(guī)范,它為Java開發(fā)人員提供了一種對象/關聯映射工具來管理Java應用中的關系數據,需要的朋友可以參考下
    2024-05-05
  • java實現圖的鄰接表存儲結構的兩種方式及實例應用詳解

    java實現圖的鄰接表存儲結構的兩種方式及實例應用詳解

    這篇文章主要介紹了java實現圖的鄰接表存儲結構的兩種方式及實例應用詳解,鄰接表構建圖是必須需要一個Graph對象,也就是圖對象!該對象包含屬性有:頂點數、邊數以及圖的頂點集合,需要的朋友可以參考下
    2019-06-06
  • 解決mybatis-plus 查詢耗時慢的問題

    解決mybatis-plus 查詢耗時慢的問題

    這篇文章主要介紹了解決mybatis-plus 查詢耗時慢的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • idea啟動項目報端口號沖突或被占用的解決方法

    idea啟動項目報端口號沖突或被占用的解決方法

    這篇文章主要介紹了idea啟動項目報端口號沖突或被占用的解決方法,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-10-10
  • 使用MyBatis-Plus實現聯表查詢分頁的示例代碼

    使用MyBatis-Plus實現聯表查詢分頁的示例代碼

    本文主要講述了如何在SpringBoot項目中使用MyBatis-Plus的分頁插件,通過這個示例,可以學會如何利用MyBatis-Plus進行高效的分頁查詢,感興趣的可以了解一下
    2024-10-10

最新評論