欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

DUCC配置平臺實現(xiàn)一個動態(tài)化線程池示例代碼

 更新時間:2023年02月16日 16:54:40   作者:京東云開發(fā)者  
這篇文章主要為大家介紹了DUCC配置平臺實現(xiàn)一個動態(tài)化線程池示例代碼,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

作者:京東零售 張賓

1.背景

在后臺開發(fā)中,會經(jīng)常用到線程池技術(shù),對于線程池核心參數(shù)的配置很大程度上依靠經(jīng)驗。然而,由于系統(tǒng)運行過程中存在的不確定性,我們很難一勞永逸地規(guī)劃一個合理的線程池參數(shù)。在對線程池配置參數(shù)進行調(diào)整時,一般需要對服務進行重啟,這樣修改的成本就會偏高。一種解決辦法就是,將線程池的配置放到配置平臺側(cè),系統(tǒng)運行期間開發(fā)人員根據(jù)系統(tǒng)運行情況對核心參數(shù)進行動態(tài)配置。

本文以公司DUCC配置平臺作為服務配置中心,以修改線程池核心線程數(shù)、最大線程數(shù)為例,實現(xiàn)一個簡單的動態(tài)化線程池。

2.代碼實現(xiàn)

當前項目中使用的是Spring 框架提供的線程池類ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底層又使用里了JDK中線程池類ThreadPoolExecutor,線程池類ThreadPoolExecutor有兩個成員方法setCorePoolSize、setMaximumPoolSize可以在運行時設置核心線程數(shù)和最大線程數(shù)。

setCorePoolSize方法執(zhí)行流程是:首先會覆蓋之前構(gòu)造函數(shù)設置的corePoolSize,然后,如果新的值比原始值要小,當多余的工作線程下次變成空閑狀態(tài)的時候會被中斷并銷毀,如果新的值比原來的值要大且工作隊列不為空,則會創(chuàng)建新的工作線程。流程圖如下:

setMaximumPoolSize方法: 首先會覆蓋之前構(gòu)造函數(shù)設置的maximumPoolSize,然后,如果新的值比原來的值要小,當多余的工作線程下次變成空閑狀態(tài)的時候會被中斷并銷毀。

Spring 框架提供的線程池類ThreadPoolTaskExecutor,此類封裝了對ThreadPoolExecutor有兩個成員方法setCorePoolSize、setMaximumPoolSize的調(diào)用。

基于以上源代碼分析,要實現(xiàn)一個簡單的動態(tài)線程池需要以下幾步:

(1)定義一個動態(tài)線程池類,繼承ThreadPoolTaskExecutor,目的跟非動態(tài)配置的線程池類ThreadPoolTaskExecutor區(qū)分開;

(2)定義和實現(xiàn)一個動態(tài)線程池配置定時刷的類,目的定時對比ducc配置的線程池數(shù)和本地應用中線程數(shù)是否一致,若不一致,則更新本地動態(tài)線程池線程池數(shù);

(3)引入公司ducc配置平臺相關(guān)jar包并創(chuàng)建一個動態(tài)線程池配置key;

(4)定義和實現(xiàn)一個應用啟動后根據(jù)動態(tài)線程池Bean和從ducc配置平臺拉取配置刷新應用中的線程數(shù)配置;

接下來代碼一一實現(xiàn):

(1)動態(tài)線程池類

/**
 * 動態(tài)線程池
 *
 */
public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
}

(2)動態(tài)線程池配置定時刷新類

@Slf4j
public class DynamicThreadPoolRefresh implements InitializingBean {
    /**
     * Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.
     */
    private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();
    /**
     * @param threadPoolBeanName
     * @param threadPoolTaskExecutor
     */
    public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {
        log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));
        DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        this.refresh();
        //創(chuàng)建定時任務線程池
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build());
        //延遲1秒執(zhí)行,每個1分鐘check一次
        executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);
    }
    private void refresh() {
        String dynamicThreadPool = "";
        try {
            if (DTP_REGISTRY.isEmpty()) {
                log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");
                return;
            }
            dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);
            if (StringUtils.isBlank(dynamicThreadPool)) {
                log.debug("DynamicThreadPool refresh dynamicThreadPool not config");
                return;
            }
            log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);
            List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() {
            });
            if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {
                log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);
                return;
            }
            for (ThreadPoolProperties properties : threadPoolPropertiesList) {
                doRefresh(properties);
            }
        } catch (Exception e) {
            log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);
        }
    }
    /**
     * @param properties
     */
    private void doRefresh(ThreadPoolProperties properties) {
        if (StringUtils.isBlank(properties.getThreadPoolBeanName())
                || properties.getCorePoolSize() < 1
                || properties.getMaxPoolSize() < 1
                || properties.getMaxPoolSize() < properties.getCorePoolSize()) {
            log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);
            return;
        }
        DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());
        if (Objects.isNull(threadPoolTaskExecutor)) {
            log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());
            return;
        }
        ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
        if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())
                && Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
            log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());
            return;
        }
        if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {
            threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());
            log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());
        }
        if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
            threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());
            log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());
        }
        ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
        log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);
    }
    private class RefreshThreadPoolConfig extends TimerTask {
        private RefreshThreadPoolConfig() {
        }
        @Override
        public void run() {
            DynamicThreadPoolRefresh.this.refresh();
        }
    }
}

線程池配置類

@Data
public class ThreadPoolProperties {
    /**
     * 線程池名稱
     */
    private String threadPoolBeanName;
    /**
     * 線程池核心線程數(shù)量
     */
    private int corePoolSize;
    /**
     * 線程池最大線程池數(shù)量
     */
    private int maxPoolSize;
}

(3)引入公司ducc配置平臺相關(guān)jar包并創(chuàng)建一個動態(tài)線程池配置key

ducc配置平臺使用見:cf.jd.com/pages/viewp…?

動態(tài)線程池配置key:dynamic.thread.pool

配置value:

[  {    "threadPoolBeanName": "submitOrderThreadPoolTaskExecutor",    "corePoolSize": 32,    "maxPoolSize": 128  }]

(4) 應用啟動刷新應用本地動態(tài)線程池配置

@Slf4j
public class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DynamicThreadPoolTaskExecutor) {
            DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean);
        }
        return bean;
    }
}

3.動態(tài)線程池應用

動態(tài)線程池Bean聲明

    <!-- 普通線程池 -->
    <bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrapper">
        <!-- 核心線程數(shù),默認為 -->
        <property name="corePoolSize" value="128"/>
        <!-- 最大線程數(shù),默認為Integer.MAX_VALUE -->
        <property name="maxPoolSize" value="512"/>
        <!-- 隊列最大長度,一般需要設置值>=notifyScheduledMainExecutor.maxNum;默認為Integer.MAX_VALUE -->
        <property name="queueCapacity" value="500"/>
        <!-- 線程池維護線程所允許的空閑時間,默認為60s -->
        <property name="keepAliveSeconds" value="60"/>
        <!-- 線程池對拒絕任務(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認為后者 -->
        <property name="rejectedExecutionHandler">
            <!-- AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 -->
            <!-- CallerRunsPolicy:主線程直接執(zhí)行該任務,執(zhí)行完之后嘗試添加下一個任務到線程池中,可以有效降低向線程池內(nèi)添加任務的速度 -->
            <!-- DiscardOldestPolicy:拋棄舊的任務、暫不支持;會導致被丟棄的任務無法再次被執(zhí)行 -->
            <!-- DiscardPolicy:拋棄當前任務、暫不支持;會導致被丟棄的任務無法再次被執(zhí)行 -->
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
    </bean>
    <!-- 動態(tài)線程池 -->
    <bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor">
        <!-- 核心線程數(shù),默認為 -->
        <property name="corePoolSize" value="32"/>
        <!-- 最大線程數(shù),默認為Integer.MAX_VALUE -->
        <property name="maxPoolSize" value="128"/>
        <!-- 隊列最大長度,一般需要設置值>=notifyScheduledMainExecutor.maxNum;默認為Integer.MAX_VALUE -->
        <property name="queueCapacity" value="500"/>
        <!-- 線程池維護線程所允許的空閑時間,默認為60s -->
        <property name="keepAliveSeconds" value="60"/>
        <!-- 線程池對拒絕任務(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認為后者 -->
        <property name="rejectedExecutionHandler">
            <!-- AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 -->
            <!-- CallerRunsPolicy:主線程直接執(zhí)行該任務,執(zhí)行完之后嘗試添加下一個任務到線程池中,可以有效降低向線程池內(nèi)添加任務的速度 -->
            <!-- DiscardOldestPolicy:拋棄舊的任務、暫不支持;會導致被丟棄的任務無法再次被執(zhí)行 -->
            <!-- DiscardPolicy:拋棄當前任務、暫不支持;會導致被丟棄的任務無法再次被執(zhí)行 -->
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
    </bean>
    <!-- 動態(tài)線程池刷新配置 -->
    <bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/>
    <bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>

業(yè)務類注入Spring Bean后,直接使用即可

 @Resource
 private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor;
 Runnable asyncTask = ()-&gt;{...};
 CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);

4.小結(jié)

本文從實際項目的業(yè)務痛點場景出發(fā),并基于公司已有的ducc配置平臺簡單實現(xiàn)了線程池線程數(shù)量可配置。

以上就是DUCC配置平臺實現(xiàn)一個動態(tài)化線程池示例代碼的詳細內(nèi)容,更多關(guān)于DUCC配置平臺動態(tài)化線程池的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • MyBatis中操作類對象的實現(xiàn)

    MyBatis中操作類對象的實現(xiàn)

    在MyBatis框架中,操作類對象是用于執(zhí)行數(shù)據(jù)庫操作的核心對象,本文主要介紹了MyBatis中操作類對象的實現(xiàn),具有一定的參考價值,感興趣的可以了解一下
    2023-11-11
  • Mybatis-Plus和Mybatis的區(qū)別詳解

    Mybatis-Plus和Mybatis的區(qū)別詳解

    這篇文章主要介紹了Mybatis-Plus和Mybatis的區(qū)別,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-08-08
  • Springboot框架實現(xiàn)自動裝配詳解

    Springboot框架實現(xiàn)自動裝配詳解

    在使用springboot時,很多配置我們都沒有做,都是springboot在幫我們完成,這很大一部分歸功于springboot自動裝配。本文將詳細為大家講解SpringBoot的自動裝配原理,需要的可以參考一下
    2022-08-08
  • java+opencv實現(xiàn)人臉識別功能

    java+opencv實現(xiàn)人臉識別功能

    這篇文章主要介紹了java+opencv實現(xiàn)人臉識別功能,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-05-05
  • Spring+Quartz配置定時任務實現(xiàn)代碼

    Spring+Quartz配置定時任務實現(xiàn)代碼

    這篇文章主要介紹了Spring+Quartz配置定時任務實現(xiàn)代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-04-04
  • java中javamail收發(fā)郵件實現(xiàn)方法

    java中javamail收發(fā)郵件實現(xiàn)方法

    這篇文章主要為大家詳細介紹了java中javamail收發(fā)郵件實現(xiàn)方法,實例分析了javamail的使用方法與相關(guān)注意事項,非常具有實用價值,感興趣的小伙伴們可以參考一下
    2016-02-02
  • java: 程序包com.fasterxml.jackson.annotation不存在的解決辦法

    java: 程序包com.fasterxml.jackson.annotation不存在的解決辦法

    當我們在導入程序之后,系統(tǒng)給出錯誤提示:java: 程序包com.fasterxml.jackson.annotation不存在,本文主要介紹了Java程序包不存在的三種解決方法,需要的朋友可以參考下
    2024-02-02
  • idea創(chuàng)建springboot項目和springcloud項目的詳細教程

    idea創(chuàng)建springboot項目和springcloud項目的詳細教程

    這篇文章主要介紹了idea創(chuàng)建springboot項目和springcloud項目方法,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-10-10
  • 基于Calendar獲取當前時間的性能比較

    基于Calendar獲取當前時間的性能比較

    這篇文章主要介紹了Calendar獲取當前時間的性能比較,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • 如何基于java或js獲取URL返回狀態(tài)碼

    如何基于java或js獲取URL返回狀態(tài)碼

    這篇文章主要介紹了如何基于java或js獲取URL返回狀態(tài)碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-11-11

最新評論