DUCC配置平臺實現(xiàn)一個動態(tài)化線程池示例代碼
作者:京東零售 張賓
1.背景
在后臺開發(fā)中,會經(jīng)常用到線程池技術(shù),對于線程池核心參數(shù)的配置很大程度上依靠經(jīng)驗。然而,由于系統(tǒng)運行過程中存在的不確定性,我們很難一勞永逸地規(guī)劃一個合理的線程池參數(shù)。在對線程池配置參數(shù)進行調(diào)整時,一般需要對服務進行重啟,這樣修改的成本就會偏高。一種解決辦法就是,將線程池的配置放到配置平臺側(cè),系統(tǒng)運行期間開發(fā)人員根據(jù)系統(tǒng)運行情況對核心參數(shù)進行動態(tài)配置。
本文以公司DUCC配置平臺作為服務配置中心,以修改線程池核心線程數(shù)、最大線程數(shù)為例,實現(xiàn)一個簡單的動態(tài)化線程池。
2.代碼實現(xiàn)
當前項目中使用的是Spring 框架提供的線程池類ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底層又使用里了JDK中線程池類ThreadPoolExecutor,線程池類ThreadPoolExecutor有兩個成員方法setCorePoolSize、setMaximumPoolSize可以在運行時設置核心線程數(shù)和最大線程數(shù)。
setCorePoolSize方法執(zhí)行流程是:首先會覆蓋之前構(gòu)造函數(shù)設置的corePoolSize,然后,如果新的值比原始值要小,當多余的工作線程下次變成空閑狀態(tài)的時候會被中斷并銷毀,如果新的值比原來的值要大且工作隊列不為空,則會創(chuàng)建新的工作線程。流程圖如下:
setMaximumPoolSize方法: 首先會覆蓋之前構(gòu)造函數(shù)設置的maximumPoolSize,然后,如果新的值比原來的值要小,當多余的工作線程下次變成空閑狀態(tài)的時候會被中斷并銷毀。
Spring 框架提供的線程池類ThreadPoolTaskExecutor,此類封裝了對ThreadPoolExecutor有兩個成員方法setCorePoolSize、setMaximumPoolSize的調(diào)用。
基于以上源代碼分析,要實現(xiàn)一個簡單的動態(tài)線程池需要以下幾步:
(1)定義一個動態(tài)線程池類,繼承ThreadPoolTaskExecutor,目的跟非動態(tài)配置的線程池類ThreadPoolTaskExecutor區(qū)分開;
(2)定義和實現(xiàn)一個動態(tài)線程池配置定時刷的類,目的定時對比ducc配置的線程池數(shù)和本地應用中線程數(shù)是否一致,若不一致,則更新本地動態(tài)線程池線程池數(shù);
(3)引入公司ducc配置平臺相關(guān)jar包并創(chuàng)建一個動態(tài)線程池配置key;
(4)定義和實現(xiàn)一個應用啟動后根據(jù)動態(tài)線程池Bean和從ducc配置平臺拉取配置刷新應用中的線程數(shù)配置;
接下來代碼一一實現(xiàn):
(1)動態(tài)線程池類
/** * 動態(tài)線程池 * */ public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { }
(2)動態(tài)線程池配置定時刷新類
@Slf4j public class DynamicThreadPoolRefresh implements InitializingBean { /** * Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor. */ private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>(); /** * @param threadPoolBeanName * @param threadPoolTaskExecutor */ public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) { log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor())); DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor); } @Override public void afterPropertiesSet() throws Exception { this.refresh(); //創(chuàng)建定時任務線程池 ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build()); //延遲1秒執(zhí)行,每個1分鐘check一次 executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS); } private void refresh() { String dynamicThreadPool = ""; try { if (DTP_REGISTRY.isEmpty()) { log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty"); return; } dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL); if (StringUtils.isBlank(dynamicThreadPool)) { log.debug("DynamicThreadPool refresh dynamicThreadPool not config"); return; } log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool); List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() { }); if (CollectionUtils.isEmpty(threadPoolPropertiesList)) { log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool); return; } for (ThreadPoolProperties properties : threadPoolPropertiesList) { doRefresh(properties); } } catch (Exception e) { log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e); } } /** * @param properties */ private void doRefresh(ThreadPoolProperties properties) { if (StringUtils.isBlank(properties.getThreadPoolBeanName()) || properties.getCorePoolSize() < 1 || properties.getMaxPoolSize() < 1 || properties.getMaxPoolSize() < properties.getCorePoolSize()) { log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties); return; } DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName()); if (Objects.isNull(threadPoolTaskExecutor)) { log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName()); return; } ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor()); if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize()) && Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) { log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName()); return; } if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) { threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize()); log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize()); } if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) { threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize()); log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize()); } ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor()); log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp); } private class RefreshThreadPoolConfig extends TimerTask { private RefreshThreadPoolConfig() { } @Override public void run() { DynamicThreadPoolRefresh.this.refresh(); } } }
線程池配置類
@Data public class ThreadPoolProperties { /** * 線程池名稱 */ private String threadPoolBeanName; /** * 線程池核心線程數(shù)量 */ private int corePoolSize; /** * 線程池最大線程池數(shù)量 */ private int maxPoolSize; }
(3)引入公司ducc配置平臺相關(guān)jar包并創(chuàng)建一個動態(tài)線程池配置key
ducc配置平臺使用見:cf.jd.com/pages/viewp…?
動態(tài)線程池配置key:dynamic.thread.pool
配置value:
[ { "threadPoolBeanName": "submitOrderThreadPoolTaskExecutor", "corePoolSize": 32, "maxPoolSize": 128 }]
(4) 應用啟動刷新應用本地動態(tài)線程池配置
@Slf4j public class DynamicThreadPoolPostProcessor implements BeanPostProcessor { @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof DynamicThreadPoolTaskExecutor) { DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean); } return bean; } }
3.動態(tài)線程池應用
動態(tài)線程池Bean聲明
<!-- 普通線程池 --> <bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrapper"> <!-- 核心線程數(shù),默認為 --> <property name="corePoolSize" value="128"/> <!-- 最大線程數(shù),默認為Integer.MAX_VALUE --> <property name="maxPoolSize" value="512"/> <!-- 隊列最大長度,一般需要設置值>=notifyScheduledMainExecutor.maxNum;默認為Integer.MAX_VALUE --> <property name="queueCapacity" value="500"/> <!-- 線程池維護線程所允許的空閑時間,默認為60s --> <property name="keepAliveSeconds" value="60"/> <!-- 線程池對拒絕任務(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認為后者 --> <property name="rejectedExecutionHandler"> <!-- AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 --> <!-- CallerRunsPolicy:主線程直接執(zhí)行該任務,執(zhí)行完之后嘗試添加下一個任務到線程池中,可以有效降低向線程池內(nèi)添加任務的速度 --> <!-- DiscardOldestPolicy:拋棄舊的任務、暫不支持;會導致被丟棄的任務無法再次被執(zhí)行 --> <!-- DiscardPolicy:拋棄當前任務、暫不支持;會導致被丟棄的任務無法再次被執(zhí)行 --> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/> </property> </bean> <!-- 動態(tài)線程池 --> <bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor"> <!-- 核心線程數(shù),默認為 --> <property name="corePoolSize" value="32"/> <!-- 最大線程數(shù),默認為Integer.MAX_VALUE --> <property name="maxPoolSize" value="128"/> <!-- 隊列最大長度,一般需要設置值>=notifyScheduledMainExecutor.maxNum;默認為Integer.MAX_VALUE --> <property name="queueCapacity" value="500"/> <!-- 線程池維護線程所允許的空閑時間,默認為60s --> <property name="keepAliveSeconds" value="60"/> <!-- 線程池對拒絕任務(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認為后者 --> <property name="rejectedExecutionHandler"> <!-- AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 --> <!-- CallerRunsPolicy:主線程直接執(zhí)行該任務,執(zhí)行完之后嘗試添加下一個任務到線程池中,可以有效降低向線程池內(nèi)添加任務的速度 --> <!-- DiscardOldestPolicy:拋棄舊的任務、暫不支持;會導致被丟棄的任務無法再次被執(zhí)行 --> <!-- DiscardPolicy:拋棄當前任務、暫不支持;會導致被丟棄的任務無法再次被執(zhí)行 --> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/> </property> </bean> <!-- 動態(tài)線程池刷新配置 --> <bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/> <bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>
業(yè)務類注入Spring Bean后,直接使用即可
@Resource private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor; Runnable asyncTask = ()->{...}; CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);
4.小結(jié)
本文從實際項目的業(yè)務痛點場景出發(fā),并基于公司已有的ducc配置平臺簡單實現(xiàn)了線程池線程數(shù)量可配置。
以上就是DUCC配置平臺實現(xiàn)一個動態(tài)化線程池示例代碼的詳細內(nèi)容,更多關(guān)于DUCC配置平臺動態(tài)化線程池的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java中javamail收發(fā)郵件實現(xiàn)方法
這篇文章主要為大家詳細介紹了java中javamail收發(fā)郵件實現(xiàn)方法,實例分析了javamail的使用方法與相關(guān)注意事項,非常具有實用價值,感興趣的小伙伴們可以參考一下2016-02-02java: 程序包com.fasterxml.jackson.annotation不存在的解決辦法
當我們在導入程序之后,系統(tǒng)給出錯誤提示:java: 程序包com.fasterxml.jackson.annotation不存在,本文主要介紹了Java程序包不存在的三種解決方法,需要的朋友可以參考下2024-02-02idea創(chuàng)建springboot項目和springcloud項目的詳細教程
這篇文章主要介紹了idea創(chuàng)建springboot項目和springcloud項目方法,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-10-10