Java自定義線程池的實(shí)現(xiàn)示例
一、Java語(yǔ)言本身也是多線程,回顧Java創(chuàng)建線程方式如下:
1、繼承Thread類,(Thread類實(shí)現(xiàn)Runnable接口),來(lái)個(gè)類圖加深印象。
2、實(shí)現(xiàn)Runnable接口實(shí)現(xiàn)無(wú)返回值、實(shí)現(xiàn)run()方法,啥時(shí)候run,黑話了。
3、實(shí)現(xiàn)Callable接口重寫call()+FutureTask獲取.
public class CustomThread { public static void main(String[] args) { // 自定義線程 new Thread(new Runnable() { @Override public void run() { System.out.println("Custom Run"); System.out.println(Thread.currentThread().getName()); } },"custom-thread-1").start(); } }
4、基于線程池集中管理創(chuàng)建線程系列周期.【本篇文章重點(diǎn)介紹】
二、JDK線程池工具類.
1、Executors工具類,是JDK中Doug Lea大佬實(shí)現(xiàn)供開發(fā)者使用。
隨著JDK版本迭代逐漸加入了基于工作竊取算法的線程池了,阿里編碼規(guī)范也推薦開發(fā)者自定義線程池,禁止生產(chǎn)直接使用Executos線程池工具類,因此很有可能造成OOM異常。同時(shí)在某些類型的線程池里面,使用無(wú)界隊(duì)列還會(huì)導(dǎo)致maxinumPoolSize、keepAliveTime、handler等參數(shù)失效。因此目前在大廠的開發(fā)規(guī)范中會(huì)強(qiáng)調(diào)禁止使用Executors來(lái)創(chuàng)建線程池。這里說(shuō)道阻塞隊(duì)列。LinkedBlockingQueue。
2、自定義線程池工具類基于ThreadPoolExecutor實(shí)現(xiàn),那個(gè)JDK封裝的線程池工具類也是基于這個(gè)ThreadPoolExecutor實(shí)現(xiàn)的。
public class ConstomThreadPool extends ThreadPoolExecutor{ /** * * @param corePoolSize 核心線程池 * @param maximumPoolSize 線程池最大數(shù)量 * @param keepAliveTime 線程存活時(shí)間 * @param unit TimeUnit * @param workQueue 工作隊(duì)列,自定義大小 * @param poolName 線程工廠自定義線程名稱 */ public ConstomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); setThreadFactory(new CustomThreadFactory(poolName, false)); } }
自定義線程工廠類,這樣線程命名有開發(fā)者控制實(shí)現(xiàn)了,這樣參數(shù)可以做到可配置化,生產(chǎn)環(huán)境可以供不同業(yè)務(wù)模塊使用,如果系統(tǒng)配置值不生效,就給一個(gè)默認(rèn)值,更加滿足業(yè)務(wù)需要.
/** * 自定義線程工廠 */ public class CustomThreadFactory implements ThreadFactory { /** * 線程前綴,采用AtomicInteger實(shí)現(xiàn)線程編號(hào)線程安全自增 */ private final AtomicInteger atomicInteger = new AtomicInteger(1); /** * 線程命名前綴 */ private final String namePrefix; /** * 線程工廠創(chuàng)建的線程是否是守護(hù)線程 */ private final boolean isDaemon; public CustomThreadFactory(String prefix, boolean daemin) { if (StringUtils.isNoneBlank(prefix)) { this.namePrefix = prefix; } else { this.namePrefix = "thread_pool"; } // 是否是守護(hù)線程 isDaemon = daemin; } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, namePrefix + "-" + atomicInteger.getAndIncrement()); thread.setDaemon(isDaemon); // 設(shè)置線程優(yōu)先級(jí) if (thread.getPriority() != Thread.NORM_PRIORITY) { thread.setPriority(Thread.NORM_PRIORITY); } return thread; } }
這里Spring框架提供的自定義線程池工廠類,當(dāng)然了一些開源包也會(huì)提供這樣的輪子,這個(gè)比較簡(jiǎn)單了.
@SuppressWarnings("serial") public class CustomizableThreadFactory extends CustomizableThreadCreator implements ThreadFactory { /** * Create a new CustomizableThreadFactory with default thread name prefix. */ public CustomizableThreadFactory() { super(); } /** * Create a new CustomizableThreadFactory with the given thread name prefix. * @param threadNamePrefix the prefix to use for the names of newly created threads */ public CustomizableThreadFactory(String threadNamePrefix) { super(threadNamePrefix); } @Override public Thread newThread(Runnable runnable) { return createThread(runnable); } }
3、SpringBoot框架提供的自定義線程池,基于異步注解@Async名稱和一些業(yè)務(wù)自定義配置項(xiàng),很好的實(shí)現(xiàn)了業(yè)務(wù)間線程池的隔離。
@Configuration public class ThreadPoolConfig { /** * * @return ThreadPoolTaskExecutor */ @Bean("serviceTaskA") public ThreadPoolTaskExecutor serviceTaskA() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(10); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("service-a"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } /** * * @return ThreadPoolTaskExecutor */ @Bean("serviceTaskB") public ThreadPoolTaskExecutor serviceTaskB() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(10); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("service-b"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } }
整體來(lái)看是Spring框架對(duì)JDK的線程池做了封裝,公開發(fā)者使用,畢竟框架嘛,肯定是把方便留給開發(fā)者。
4、并發(fā)流線程池。
List<String> list = new ArrayList<>(4); list.add("A"); list.add("B"); list.add("C"); list.add("D"); list.parallelStream().forEach(string -> { string = string + "paralleStream"; System.out.println(Thread.currentThread().getName()+":-> "+string); });
運(yùn)行實(shí)例:
說(shuō)明:并發(fā)流默認(rèn)使用系統(tǒng)公共的線程池ForkJoinWorkerThread,供整個(gè)程序使用。
類圖如下,基于分治法,雙端竊取算法實(shí)現(xiàn)的一種線程池。
ForkJoin實(shí)現(xiàn)的了自己的線程工廠命名。
也可以自定義并發(fā)流線程,然后提交任務(wù),一般并發(fā)流適用于短暫耗時(shí)業(yè)務(wù),避免拖垮整個(gè)線程池業(yè)務(wù).
5、實(shí)現(xiàn)一個(gè)基于系統(tǒng)公用線程池工具類,運(yùn)行這個(gè)系統(tǒng)中的異步業(yè)務(wù).
public final class CustomExecutors { /** * 核心線程數(shù)大小 */ private static final int CORE_POOL_SIZE=5; /** * 核心線程池大小 */ private static final int MAX_POOL_SIZE=10; /** * 線程存活時(shí)間 */ private static final int KEEP_ALIVE_TIME=60; /** * 工作隊(duì)列大小 */ private static final LinkedBlockingQueue queue=new LinkedBlockingQueue(100); /** * 自定義線程池名前綴 */ private static final String POOL_PREFIX_NAME="Custom-Common-Pool"; private CustomExecutors(){ //throw new XXXXException("un support create pool!"); } private static ConstomThreadPool constomThreadPool; /** * 靜態(tài)塊初始化只執(zhí)行一次,不關(guān)閉,整個(gè)系統(tǒng)公用一個(gè)線程池 */ static { constomThreadPool=new ConstomThreadPool(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.SECONDS,queue,POOL_PREFIX_NAME); } /** * 單例模式獲取線程池 * @return ExecutorService */ private static ExecutorService getInstance(){ return constomThreadPool; } private static Future<?> submit(Runnable task){ return constomThreadPool.submit(task); } private static <T> Future<T> submit(Runnable task, T result){ return constomThreadPool.submit(task,result); } private static <T> Future<T> submit(Callable<T> task){ return constomThreadPool.submit(task); } private static void execute(Runnable task){ constomThreadPool.execute(task); } }
三、業(yè)界知名自定義線程池?cái)U(kuò)展使用.
1、org.apache.tomcat.util.threads;【Tomcat線程池】
2、XXL-JOB分布式任務(wù)調(diào)度框架的快慢線程池,線程池任務(wù)隔離.
public class JobTriggerPoolHelper { private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class); // ---------------------- trigger pool ---------------------- // fast/slow thread pool private ThreadPoolExecutor fastTriggerPool = null; private ThreadPoolExecutor slowTriggerPool = null; public void start(){ fastTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()); } }); slowTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()); } }); } public void stop() { //triggerPool.shutdown(); fastTriggerPool.shutdownNow(); slowTriggerPool.shutdownNow(); logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success."); } // job timeout count private volatile long minTim = System.currentTimeMillis()/60000; // ms > min private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>(); /** * add trigger */ public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam, final String addressList) { // choose thread pool ThreadPoolExecutor triggerPool_ = fastTriggerPool; AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min triggerPool_ = slowTriggerPool; } // trigger triggerPool_.execute(new Runnable() { @Override public void run() { long start = System.currentTimeMillis(); try { // do trigger XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { // check timeout-count-map long minTim_now = System.currentTimeMillis()/60000; if (minTim != minTim_now) { minTim = minTim_now; jobTimeoutCountMap.clear(); } // incr timeout-count-map long cost = System.currentTimeMillis()-start; if (cost > 500) { // ob-timeout threshold 500ms AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)); if (timeoutCount != null) { timeoutCount.incrementAndGet(); } } } } }); } // ---------------------- helper ---------------------- private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper(); public static void toStart() { helper.start(); } public static void toStop() { helper.stop(); } /** * @param jobId * @param triggerType * @param failRetryCount * >=0: use this param * <0: use param from job info config * @param executorShardingParam * @param executorParam * null: use job param * not null: cover job param */ public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } }
①、定義兩個(gè)線程池,一個(gè)是fastTriggerPool,另一個(gè)是slowTriggerPool。
②、定義一個(gè)容器ConcurrentMap,存放每個(gè)任務(wù)的執(zhí)行慢次數(shù),60秒后自動(dòng)清空該容器。
③、在線程的run()方法中計(jì)算每個(gè)任務(wù)的耗時(shí),如果大于500ms,則任務(wù)的慢執(zhí)行次數(shù)+1。
3、基于線程池動(dòng)態(tài)監(jiān)控動(dòng)態(tài)線程池
引用圖片,線程池常見問(wèn)題
還有比較多啦,例如ES的基于JDK的線程池,Dubbo中等。
到此這篇關(guān)于Java自定義線程池的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)Java自定義線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
關(guān)于后綴表達(dá)式的java實(shí)現(xiàn)過(guò)程
這篇文章主要介紹了關(guān)于后綴表達(dá)式的java實(shí)現(xiàn)過(guò)程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07使用Spring RestTemplate 詳解實(shí)踐使用及拓展增強(qiáng)
這篇文章主要介紹了使用Spring RestTemplate 詳解實(shí)踐使用及拓展增強(qiáng),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10Java實(shí)現(xiàn)輕松處理日期和時(shí)間的API小結(jié)
這篇文章主要為大家詳細(xì)介紹了Java中的日期和時(shí)間API,可以輕松處理日期和時(shí)間,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-03-03在Eclipse中運(yùn)行Solr 基礎(chǔ)知識(shí)
Solr我還是個(gè)菜鳥,寫這一些文章只是記錄一下最近一段時(shí)間學(xué)習(xí)Solr的心得,望各位同仁不要見笑,還希望多多指點(diǎn)2012-11-11關(guān)于@ApiModel和@ApiModelProperty的使用
這篇文章主要介紹了關(guān)于@ApiModel和@ApiModelProperty的使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11Spring與Struts整合之讓Spring管理控制器操作示例
這篇文章主要介紹了Spring與Struts整合之讓Spring管理控制器操作,結(jié)合實(shí)例形式詳細(xì)分析了Spring管理控制器相關(guān)配置、接口實(shí)現(xiàn)與使用技巧,需要的朋友可以參考下2020-01-01