詳解如何自定義parallelStream線程池
ForkJoinPool
本文主要研究一下parallelStream怎么使用自定義的線程池
java/util/concurrent/ForkJoinPool.java
public class ForkJoinPool extends AbstractExecutorService { public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); } private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); } private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = new DefaultCommonPoolForkJoinWorkerThreadFactory(); else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); } }
parallelStream默認(rèn)使用的是common的ForkJoinPool,可以通過系統(tǒng)屬性來設(shè)置parallelism等
ForkJoinPoolFactoryBean
org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java
public class ForkJoinPoolFactoryBean implements FactoryBean<ForkJoinPool>, InitializingBean, DisposableBean { private boolean commonPool = false; private int parallelism = Runtime.getRuntime().availableProcessors(); private ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory; @Nullable private Thread.UncaughtExceptionHandler uncaughtExceptionHandler; private boolean asyncMode = false; private int awaitTerminationSeconds = 0; @Nullable private ForkJoinPool forkJoinPool; //...... @Override public void destroy() { if (this.forkJoinPool != null) { // Ignored for the common pool. this.forkJoinPool.shutdown(); // Wait for all tasks to terminate - works for the common pool as well. if (this.awaitTerminationSeconds > 0) { try { this.forkJoinPool.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } } }
spring3.1提供了ForkJoinPoolFactoryBean,可以用于創(chuàng)建并托管forkJoinPool
示例
配置
@Configuration public class ForkJoinConfig { @Bean public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() { ForkJoinPoolFactoryBean factoryBean = new ForkJoinPoolFactoryBean(); factoryBean.setCommonPool(false); // NOTE LIFO_QUEUE FOR working steal from tail of queue factoryBean.setAsyncMode(true); // NOTE true FIFO_QUEUE, false LIFO_QUEUE factoryBean.setParallelism(10); // factoryBean.setUncaughtExceptionHandler(); factoryBean.setAwaitTerminationSeconds(60); return factoryBean; } }
使用
@Autowired ForkJoinPoolFactoryBean forkJoinPoolFactoryBean; public void streamParallel() throws ExecutionException, InterruptedException { List<TodoTask> result = forkJoinPoolFactoryBean.getObject().submit(new Callable<List<TodoTask>>() { @Override public List<TodoTask> call() throws Exception { return IntStream.rangeClosed(1, 20).parallel().mapToObj(i -> { log.info("thread:{}", Thread.currentThread().getName()); return new TodoTask(i, "name"+i); }).collect(Collectors.toList()); } }).get(); result.stream().forEach(System.out::println); }
common的workerName前綴為ForkJoinPool.commonPool-worker-
自定義的workerName前綴默認(rèn)為ForkJoinPool- nextPoolId() -worker-
小結(jié)
parallelStream默認(rèn)使用的是commonPool,是static代碼塊默認(rèn)初始化,針對(duì)個(gè)別場(chǎng)景可以自定義ForkJoinPool,將parallelStream作為一個(gè)任務(wù)丟進(jìn)去,這樣子就不會(huì)影響默認(rèn)的commonPool。
以上就是如何自定義parallelStream的線程池的詳細(xì)內(nèi)容,更多關(guān)于如何自定義parallelStream的線程池的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java設(shè)計(jì)模式之java狀態(tài)模式詳解
這篇文章主要介紹了Java設(shè)計(jì)模式之狀態(tài)模式定義與用法,結(jié)合具體實(shí)例形式詳細(xì)分析了Java狀態(tài)模式的概念、原理、定義及相關(guān)操作技巧,需要的朋友可以參考下2021-09-09Java 數(shù)據(jù)流之Broadcast State
這篇文章主要介紹了Java 數(shù)據(jù)流之Broadcast State,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-09-09SpringBoot+MyBatis-Flex配置ProxySQL的實(shí)現(xiàn)步驟
本文主要介紹了SpringBoot+MyBatis-Flex配置ProxySQL的實(shí)現(xiàn)步驟,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-02-02Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(8)
下面小編就為大家?guī)硪黄狫ava基礎(chǔ)的幾道練習(xí)題(分享)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧,希望可以幫到你2021-07-07SpringBoot整合JPA詳細(xì)代碼實(shí)例
這篇文章主要給大家介紹了關(guān)于SpringBoot整合JPA的相關(guān)資料,JPA(Java Persistence API)是Sun官方提出的Java持久化規(guī)范,它為Java開發(fā)人員提供了一種對(duì)象/關(guān)聯(lián)映射工具來管理Java應(yīng)用中的關(guān)系數(shù)據(jù),需要的朋友可以參考下2024-05-05java實(shí)現(xiàn)圖的鄰接表存儲(chǔ)結(jié)構(gòu)的兩種方式及實(shí)例應(yīng)用詳解
這篇文章主要介紹了java實(shí)現(xiàn)圖的鄰接表存儲(chǔ)結(jié)構(gòu)的兩種方式及實(shí)例應(yīng)用詳解,鄰接表構(gòu)建圖是必須需要一個(gè)Graph對(duì)象,也就是圖對(duì)象!該對(duì)象包含屬性有:頂點(diǎn)數(shù)、邊數(shù)以及圖的頂點(diǎn)集合,需要的朋友可以參考下2019-06-06idea啟動(dòng)項(xiàng)目報(bào)端口號(hào)沖突或被占用的解決方法
這篇文章主要介紹了idea啟動(dòng)項(xiàng)目報(bào)端口號(hào)沖突或被占用的解決方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-10-10使用MyBatis-Plus實(shí)現(xiàn)聯(lián)表查詢分頁(yè)的示例代碼
本文主要講述了如何在SpringBoot項(xiàng)目中使用MyBatis-Plus的分頁(yè)插件,通過這個(gè)示例,可以學(xué)會(huì)如何利用MyBatis-Plus進(jìn)行高效的分頁(yè)查詢,感興趣的可以了解一下2024-10-10