java動態(tài)線程池的簡單實現思路
什么是動態(tài)線程池?
在線程池日常實踐中我們常常會遇到以下問題:
- 代碼中創(chuàng)建了一個線程池卻不知道核心參數設置多少比較合適。
- 參數設置好后,上線發(fā)現需要調整,改代碼重啟服務非常麻煩。
- 線程池相對于開發(fā)人員來說是個黑箱,運行情況在出現問題 前很難被感知。
因此,動態(tài)可監(jiān)控線程池一種針對以上痛點開發(fā)的線程池管理工具。主要可實現功能有:提供對 Spring 應用內線程池實例的全局管控、應用運行時動態(tài)變更線程池參數以及線程池數據采集和監(jiān)控閾值報警。
已經實現的優(yōu)秀開源動態(tài)線程池
hippo4j、dynamic-tp.....
實現思路
核心管理類
- 需要能實現對線程池的
- 服務注冊
- 獲取已經注冊好的線程池
以及對注冊號線程池參數的刷新。
對于每一個線程池,我們使用一個線程池名字作為標識每個線程池的唯一ID。
偽代碼實現
public class DtpRegistry { /** * 儲存線程池 */ private static final Map<String, Executor> EXECUTOR_MAP = new ConcurrentHashMap<>(); /** * 獲取線程池 * @param executorName 線程池名字 */ public static Executor getExecutor(String executorName) { return EXECUTOR_MAP.get(executorName); } /** * 線程池注冊 * @param executorName 線程池名字 */ public static void registry(String executorName, Executor executor) { //注冊 EXECUTOR_MAP.put(executorName, executorWrapper); } /** * 刷新線程池參數 * @param executorName 線程池名字 * @param properties 線程池參數 */ public static void refresh(String executorName, ThreadPoolProperties properties) { Executor executor = EXECUTOR_MAP.get(executorName) //刷新參數 //....... } }
如何創(chuàng)建線程池?
STEP 1. 我們可以使用yml配置文件的方式配置一個線程池,將線程池實例的創(chuàng)建交由Spring容器。
相關配置
public class DtpProperties { ? ? private List<ThreadPoolProperties> executors; } public class ThreadPoolProperties { ? ? /** ? ? ?* 標識每個線程池的唯一名字 ? ? ?*/ ? ? private String poolName; ? ? private String poolType = "common"; ? ? /** ? ? ?* 是否為守護線程 ? ? ?*/ ? ? private boolean isDaemon = false; ? ? /** ? ? ?* 以下都是核心參數 ? ? ?*/ ? ? private int corePoolSize = 1; ? ? private int maximumPoolSize = 1; ? ? private long keepAliveTime; ? ? private TimeUnit timeUnit = TimeUnit.SECONDS; ? ? private String queueType = "arrayBlockingQueue"; ? ? private int queueSize = 5; ? ? private String threadFactoryPrefix = "-td-"; ? ? private String RejectedExecutionHandler; }
yml example:
spring: dtp: executors: # 線程池1 - poolName: dtpExecutor1 corePoolSize: 5 maximumPoolSize: 10 # 線程池2 - poolName: dtpExecutor2 corePoolSize: 2 maximumPoolSize: 15
STEP 2 根據配置信息添加線程池的BeanDefinition
關鍵類
@Slf4j public class DtpImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware { ? ? private Environment environment; ? ? @Override ? ? public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { ? ? ? ? log.info("注冊"); ? ? ? ? //綁定資源 ? ? ? ? DtpProperties dtpProperties = new DtpProperties(); ? ? ? ? ResourceBundlerUtil.bind(environment, dtpProperties); ? ? ? ? List<ThreadPoolProperties> executors = dtpProperties.getExecutors(); ? ? ? ? if (Objects.isNull(executors)) { ? ? ? ? ? ? log.info("未檢測本地到配置文件線程池"); ? ? ? ? ? ? return; ? ? ? ? } ? ? ? ? //注冊beanDefinition ? ? ? ? executors.forEach((executorProp) -> { ? ? ? ? ? ? BeanUtil.registerIfAbsent(registry, executorProp); ? ? ? ? }); ? ? } ? ? @Override ? ? public void setEnvironment(Environment environment) { ? ? ? ? this.environment = environment; ? ? } } /** ?* ?* 工具類 ?* ?*/ public class BeanUtil{ ? ? public static void registerIfAbsent(BeanDefinitionRegistry registry, ThreadPoolProperties executorProp) { ? ? ? ? register(registry, executorProp.getPoolName(), executorProp); ? ? } ? ? public static void register(BeanDefinitionRegistry registry, String beanName, ThreadPoolProperties executorProp) { ? ? ? ? Class<? extends Executor> executorType = ExecutorType.getClazz(executorProp.getPoolType()); ? ? ? ? Object[] args = assembleArgs(executorProp); ? ? ? ? register(registry, beanName, executorType, args); ? ? } ? ? public static void register(BeanDefinitionRegistry registry, String beanName, Class<?> clazz, Object[] args) { ? ? ? ? BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz); ? ? ? ? for (Object arg : args) { ? ? ? ? ? ? builder.addConstructorArgValue(arg); ? ? ? ? } ? ? ? ? registry.registerBeanDefinition(beanName, builder.getBeanDefinition()); ? ? } ? ? private static Object[] assembleArgs(ThreadPoolProperties executorProp) { ? ? ? ? return new Object[]{ ? ? ? ? ? ? ? ? executorProp.getCorePoolSize(), ? ? ? ? ? ? ? ? executorProp.getMaximumPoolSize(), ? ? ? ? ? ? ? ? executorProp.getKeepAliveTime(), ? ? ? ? ? ? ? ? executorProp.getTimeUnit(), ? ? ? ? ? ? ? ? QueueType.getInstance(executorProp.getQueueType(), executorProp.getQueueSize()), ? ? ? ? ? ? ? ? new NamedThreadFactory( ? ? ? ? ? ? ? ? ? ? ? ? executorProp.getPoolName() + executorProp.getThreadFactoryPrefix(), ? ? ? ? ? ? ? ? ? ? ? ? executorProp.isDaemon() ? ? ? ? ? ? ? ? ), ? ? ? ? ? ? ? ? //先默認不做設置 ? ? ? ? ? ? ? ? RejectPolicy.ABORT.getValue() ? ? ? ? }; ? ? } }
下面解釋一下這個類的作用,environment實例中儲存著spring啟動時解析的yml配置,所以我們spring提供的Binder將配置綁定到我們前面定義的DtpProperties類中,方便后續(xù)使用。接下來的比較簡單,就是將線程池的BeanDefinition注冊到IOC容器中,讓spring去幫我們實例化這個bean。
STEP 3. 將已經實例化的線程池注冊到核心類 DtpRegistry 中
我們注冊了 beanDefinition 后,spring會幫我們實例化出來, 在這之后我們可以根據需要將這個bean進行進一步的處理,spring也提供了很多機制讓我們對bean的生命周期管理進行更多的擴展。對應到這里我們就是將實例化出來的線程池注冊到核心類 DtpRegistry 中進行管理。
這里我們使用 BeanPostProcessor 進行處理。
@Slf4j public class DtpBeanPostProcessor implements BeanPostProcessor { ? ? private DefaultListableBeanFactory beanFactory; ? ? @Override ? ? public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { ? ? ? ? if (bean instanceof DtpExecutor) { ? ? ? ? ? ? //直接納入管理 ? ? ? ? ? ? DtpRegistry.registry(beanName, (DtpExecutor) bean); ? ? ? ? } ? ? ? ? return bean; ? ? } }
這里的邏輯很簡單, 就是判斷一下這個bean是不是線程池,是就統(tǒng)一管理起來。
STEP 4. 啟用 BeanDefinitionRegistrar 和 BeanPostProcessor
在springboot程序中,只要加一個@MapperScan注解就能啟用mybatis的功能,我們可以學習其在spring中的啟用方式,自定義一個注解:
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import(DtpImportSelector.class) public @interface EnableDynamicThreadPool { }
其中,比較關鍵的是@Import注解,spring會導入注解中的類DtpImportSelector
而DtpImportSelector這個類實現了:
public class DtpImportSelector implements DeferredImportSelector { @Override public String[] selectImports(AnnotationMetadata importingClassMetadata) { return new String[]{ DtpImportBeanDefinitionRegistrar.class.getName(), DtpBeanPostProcessor.class.getName() }; } }
這樣,只要我們再啟動類或者配置類上加上@EnableDynamicThreadPool這個注解,spring就會將DtpImportBeanDefinitionRegistrar和DtpBeanPostProcessor這兩個類加入spring容器管理,從而實現我們的線程池的注冊。
@SpringBootApplication @EnableDynamicThreadPool public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
如何實現線程池配置的動態(tài)刷新
首先明確一點,對于線程池的實現類,例如:ThreadPoolExecutor等,都有提供核心參數對應的 set 方法,讓我們實現參數修改。因此,在核心類DtpRegistry中的refresh方法,我們可以這樣寫:
public class DtpRegistry { /** * 儲存線程池 */ private static final Map<String, ThreadPoolExecutor> EXECUTOR_MAP = new ConcurrentHashMap<>(); /** * 刷新線程池參數 * @param executorName 線程池名字 * @param properties 線程池參數 */ public static void refresh(String executorName, ThreadPoolProperties properties) { ThreadPoolExecutor executor = EXECUTOR_MAP.get(executorName) //設置參數 executor.setCorePoolSize(...); executor.setMaximumPoolSize(...); ...... } }
而這些新參數怎么來呢?我們可以引入Nacos、Apollo等配置中心,實現他們的監(jiān)聽器方法,在監(jiān)聽器方法里調用DtpRegistry的refresh方法刷新即可。
到此這篇關于java動態(tài)線程池的簡單實現思路的文章就介紹到這了,更多相關java動態(tài)線程池內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!