詳解如何自定義parallelStream線程池
ForkJoinPool
本文主要研究一下parallelStream怎么使用自定義的線程池
java/util/concurrent/ForkJoinPool.java
public class ForkJoinPool extends AbstractExecutorService { public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); } private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); } private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = new DefaultCommonPoolForkJoinWorkerThreadFactory(); else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); } }
parallelStream默認使用的是common的ForkJoinPool,可以通過系統(tǒng)屬性來設置parallelism等
ForkJoinPoolFactoryBean
org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java
public class ForkJoinPoolFactoryBean implements FactoryBean<ForkJoinPool>, InitializingBean, DisposableBean { private boolean commonPool = false; private int parallelism = Runtime.getRuntime().availableProcessors(); private ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory; @Nullable private Thread.UncaughtExceptionHandler uncaughtExceptionHandler; private boolean asyncMode = false; private int awaitTerminationSeconds = 0; @Nullable private ForkJoinPool forkJoinPool; //...... @Override public void destroy() { if (this.forkJoinPool != null) { // Ignored for the common pool. this.forkJoinPool.shutdown(); // Wait for all tasks to terminate - works for the common pool as well. if (this.awaitTerminationSeconds > 0) { try { this.forkJoinPool.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } } }
spring3.1提供了ForkJoinPoolFactoryBean,可以用于創(chuàng)建并托管forkJoinPool
示例
配置
@Configuration public class ForkJoinConfig { @Bean public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() { ForkJoinPoolFactoryBean factoryBean = new ForkJoinPoolFactoryBean(); factoryBean.setCommonPool(false); // NOTE LIFO_QUEUE FOR working steal from tail of queue factoryBean.setAsyncMode(true); // NOTE true FIFO_QUEUE, false LIFO_QUEUE factoryBean.setParallelism(10); // factoryBean.setUncaughtExceptionHandler(); factoryBean.setAwaitTerminationSeconds(60); return factoryBean; } }
使用
@Autowired ForkJoinPoolFactoryBean forkJoinPoolFactoryBean; public void streamParallel() throws ExecutionException, InterruptedException { List<TodoTask> result = forkJoinPoolFactoryBean.getObject().submit(new Callable<List<TodoTask>>() { @Override public List<TodoTask> call() throws Exception { return IntStream.rangeClosed(1, 20).parallel().mapToObj(i -> { log.info("thread:{}", Thread.currentThread().getName()); return new TodoTask(i, "name"+i); }).collect(Collectors.toList()); } }).get(); result.stream().forEach(System.out::println); }
common的workerName前綴為ForkJoinPool.commonPool-worker-
自定義的workerName前綴默認為ForkJoinPool- nextPoolId() -worker-
小結
parallelStream默認使用的是commonPool,是static代碼塊默認初始化,針對個別場景可以自定義ForkJoinPool,將parallelStream作為一個任務丟進去,這樣子就不會影響默認的commonPool。
以上就是如何自定義parallelStream的線程池的詳細內容,更多關于如何自定義parallelStream的線程池的資料請關注腳本之家其它相關文章!
相關文章
SpringBoot+MyBatis-Flex配置ProxySQL的實現步驟
本文主要介紹了SpringBoot+MyBatis-Flex配置ProxySQL的實現步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2025-02-02