詳解如何自定義parallelStream線程池
ForkJoinPool
本文主要研究一下parallelStream怎么使用自定義的線程池
java/util/concurrent/ForkJoinPool.java
public class ForkJoinPool extends AbstractExecutorService {
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if (pp != null)
parallelism = Integer.parseInt(pp);
if (fp != null)
factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = new DefaultCommonPoolForkJoinWorkerThreadFactory();
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
}parallelStream默認(rèn)使用的是common的ForkJoinPool,可以通過系統(tǒng)屬性來設(shè)置parallelism等
ForkJoinPoolFactoryBean
org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java
public class ForkJoinPoolFactoryBean implements FactoryBean<ForkJoinPool>, InitializingBean, DisposableBean {
private boolean commonPool = false;
private int parallelism = Runtime.getRuntime().availableProcessors();
private ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory;
@Nullable
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
private boolean asyncMode = false;
private int awaitTerminationSeconds = 0;
@Nullable
private ForkJoinPool forkJoinPool;
//......
@Override
public void destroy() {
if (this.forkJoinPool != null) {
// Ignored for the common pool.
this.forkJoinPool.shutdown();
// Wait for all tasks to terminate - works for the common pool as well.
if (this.awaitTerminationSeconds > 0) {
try {
this.forkJoinPool.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
}
}spring3.1提供了ForkJoinPoolFactoryBean,可以用于創(chuàng)建并托管forkJoinPool
示例
配置
@Configuration
public class ForkJoinConfig {
@Bean
public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() {
ForkJoinPoolFactoryBean factoryBean = new ForkJoinPoolFactoryBean();
factoryBean.setCommonPool(false);
// NOTE LIFO_QUEUE FOR working steal from tail of queue
factoryBean.setAsyncMode(true); // NOTE true FIFO_QUEUE, false LIFO_QUEUE
factoryBean.setParallelism(10);
// factoryBean.setUncaughtExceptionHandler();
factoryBean.setAwaitTerminationSeconds(60);
return factoryBean;
}
}使用
@Autowired
ForkJoinPoolFactoryBean forkJoinPoolFactoryBean;
public void streamParallel() throws ExecutionException, InterruptedException {
List<TodoTask> result = forkJoinPoolFactoryBean.getObject().submit(new Callable<List<TodoTask>>() {
@Override
public List<TodoTask> call() throws Exception {
return IntStream.rangeClosed(1, 20).parallel().mapToObj(i -> {
log.info("thread:{}", Thread.currentThread().getName());
return new TodoTask(i, "name"+i);
}).collect(Collectors.toList());
}
}).get();
result.stream().forEach(System.out::println);
} common的workerName前綴為ForkJoinPool.commonPool-worker-
自定義的workerName前綴默認(rèn)為ForkJoinPool- nextPoolId() -worker-
小結(jié)
parallelStream默認(rèn)使用的是commonPool,是static代碼塊默認(rèn)初始化,針對個(gè)別場景可以自定義ForkJoinPool,將parallelStream作為一個(gè)任務(wù)丟進(jìn)去,這樣子就不會(huì)影響默認(rèn)的commonPool。
以上就是如何自定義parallelStream的線程池的詳細(xì)內(nèi)容,更多關(guān)于如何自定義parallelStream的線程池的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java設(shè)計(jì)模式之java狀態(tài)模式詳解
這篇文章主要介紹了Java設(shè)計(jì)模式之狀態(tài)模式定義與用法,結(jié)合具體實(shí)例形式詳細(xì)分析了Java狀態(tài)模式的概念、原理、定義及相關(guān)操作技巧,需要的朋友可以參考下2021-09-09
Java 數(shù)據(jù)流之Broadcast State
這篇文章主要介紹了Java 數(shù)據(jù)流之Broadcast State,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-09-09
SpringBoot+MyBatis-Flex配置ProxySQL的實(shí)現(xiàn)步驟
本文主要介紹了SpringBoot+MyBatis-Flex配置ProxySQL的實(shí)現(xiàn)步驟,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-02-02
Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(8)
下面小編就為大家?guī)硪黄狫ava基礎(chǔ)的幾道練習(xí)題(分享)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧,希望可以幫到你2021-07-07
SpringBoot整合JPA詳細(xì)代碼實(shí)例
這篇文章主要給大家介紹了關(guān)于SpringBoot整合JPA的相關(guān)資料,JPA(Java Persistence API)是Sun官方提出的Java持久化規(guī)范,它為Java開發(fā)人員提供了一種對象/關(guān)聯(lián)映射工具來管理Java應(yīng)用中的關(guān)系數(shù)據(jù),需要的朋友可以參考下2024-05-05
java實(shí)現(xiàn)圖的鄰接表存儲(chǔ)結(jié)構(gòu)的兩種方式及實(shí)例應(yīng)用詳解
這篇文章主要介紹了java實(shí)現(xiàn)圖的鄰接表存儲(chǔ)結(jié)構(gòu)的兩種方式及實(shí)例應(yīng)用詳解,鄰接表構(gòu)建圖是必須需要一個(gè)Graph對象,也就是圖對象!該對象包含屬性有:頂點(diǎn)數(shù)、邊數(shù)以及圖的頂點(diǎn)集合,需要的朋友可以參考下2019-06-06
idea啟動(dòng)項(xiàng)目報(bào)端口號沖突或被占用的解決方法
這篇文章主要介紹了idea啟動(dòng)項(xiàng)目報(bào)端口號沖突或被占用的解決方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-10-10
使用MyBatis-Plus實(shí)現(xiàn)聯(lián)表查詢分頁的示例代碼
本文主要講述了如何在SpringBoot項(xiàng)目中使用MyBatis-Plus的分頁插件,通過這個(gè)示例,可以學(xué)會(huì)如何利用MyBatis-Plus進(jìn)行高效的分頁查詢,感興趣的可以了解一下2024-10-10

