PowerJobAutoConfiguration自動配置源碼流程解析
序
本文主要研究一下PowerJobAutoConfiguration
PowerJobProperties
tech/powerjob/worker/autoconfigure/PowerJobProperties.java
@ConfigurationProperties(prefix = "powerjob") public class PowerJobProperties { private final Worker worker = new Worker(); public Worker getWorker() { return worker; } //...... }
PowerJobProperties的配置前綴為powerjob,主要的配置都在worker上
Worker
/** * Powerjob worker configuration properties. */ @Setter @Getter public static class Worker { /** * Whether to enable PowerJob Worker */ private boolean enabled = true; /** * Name of application, String type. Total length of this property should be no more than 255 * characters. This is one of the required properties when registering a new application. This * property should be assigned with the same value as what you entered for the appName. */ private String appName; /** * Akka port of Powerjob-worker, optional value. Default value of this property is 27777. * If multiple PowerJob-worker nodes were deployed, different, unique ports should be assigned. * Deprecated, please use 'port' */ @Deprecated private int akkaPort = RemoteConstant.DEFAULT_WORKER_PORT; /** * port */ private Integer port; /** * Address(es) of Powerjob-server node(s). Ip:port or domain. * Example of single Powerjob-server node: * <p> * 127.0.0.1:7700 * </p> * Example of Powerjob-server cluster: * <p> * 192.168.0.10:7700,192.168.0.11:7700,192.168.0.12:7700 * </p> */ private String serverAddress; /** * Protocol for communication between WORKER and server */ private Protocol protocol = Protocol.AKKA; /** * Local store strategy for H2 database. {@code disk} or {@code memory}. */ private StoreStrategy storeStrategy = StoreStrategy.DISK; /** * Max length of response result. Result that is longer than the value will be truncated. * {@link ProcessResult} max length for #msg */ private int maxResultLength = 8192; /** * If test mode is set as true, Powerjob-worker no longer connects to the server or validates appName. * Test mode is used for conditions that your have no powerjob-server in your develop env, so you can't start up the application */ private boolean enableTestMode = false; /** * Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignored. * {@link WorkflowContext} max length for #appendedContextData */ private int maxAppendedWfContextLength = 8192; private String tag; /** * Max numbers of LightTaskTacker */ private Integer maxLightweightTaskNum = 1024; /** * Max numbers of HeavyTaskTacker */ private Integer maxHeavyweightTaskNum = 64; /** * Interval(s) of worker health report */ private Integer healthReportInterval = 10; }
Worker定義了enabled、appName、port(默認27777
)、serverAddress(支持多個ip:port用逗號分隔
)、protocol(默認為akka,也支持http
)、storeStrategy(默認為disk,也支持memory,主要是配置H2數(shù)據(jù)庫的存儲模式
)、maxResultLength(返回結(jié)果的最大長度,默認為8192
)、enableTestMode(默認為false,主要用于沒有部署server的場景下進行調(diào)試
)、maxAppendedWfContextLength(默認為8192
)、tag、maxLightweightTaskNum(默認為1024
)、maxHeavyweightTaskNum(默認為64
)、healthReportInterval(默認為10s
)
PowerJobWorkerConfig
tech/powerjob/worker/common/PowerJobWorkerConfig.java
@Getter @Setter public class PowerJobWorkerConfig { /** * AppName, recommend to use the name of this project * Applications should be registered by powerjob-console in advance to prevent error. */ private String appName; /** * Worker port * Random port is enabled when port is set with non-positive number. */ private int port = RemoteConstant.DEFAULT_WORKER_PORT; /** * Address of powerjob-server node(s) * Do not mistake for ActorSystem port. Do not add any prefix, i.e. http://. */ private List<String> serverAddress = Lists.newArrayList(); /** * Protocol for communication between WORKER and server */ private Protocol protocol = Protocol.AKKA; /** * Max length of response result. Result that is longer than the value will be truncated. * {@link ProcessResult} max length for #msg */ private int maxResultLength = 8096; /** * User-defined context object, which is passed through to the TaskContext#userContext property * Usage Scenarios: The container Java processor needs to use the Spring bean of the host application, where you can pass in the ApplicationContext and get the bean in the Processor */ private Object userContext; /** * Internal persistence method, DISK or MEMORY * Normally you don't need to care about this configuration */ private StoreStrategy storeStrategy = StoreStrategy.DISK; /** * If test mode is set as true, Powerjob-worker no longer connects to the server or validates appName. * Test mode is used for conditions that your have no powerjob-server in your develop env so you can't startup the application */ private boolean enableTestMode = false; /** * Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore. * {@link WorkflowContext} max length for #appendedContextData */ private int maxAppendedWfContextLength = 8192; /** * user-customized system metrics collector */ private SystemMetricsCollector systemMetricsCollector; /** * Processor factory for custom logic, generally used for IOC framework processor bean injection that is not officially supported by PowerJob */ private List<ProcessorFactory> processorFactoryList; private String tag; /** * Max numbers of LightTaskTacker */ private Integer maxLightweightTaskNum = 1024; /** * Max numbers of HeavyTaskTacker */ private Integer maxHeavyweightTaskNum = 64; /** * Interval(s) of worker health report */ private Integer healthReportInterval = 10; }
PowerJobWorkerConfig配置基本與PowerJobProperties.Worker配置相同
PowerJobAutoConfiguration
tech/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java
@Configuration @EnableConfigurationProperties(PowerJobProperties.class) @ConditionalOnProperty(prefix = "powerjob.worker", name = "enabled", havingValue = "true", matchIfMissing = true) public class PowerJobAutoConfiguration { @Bean @ConditionalOnMissingBean public PowerJobSpringWorker initPowerJob(PowerJobProperties properties) { PowerJobProperties.Worker worker = properties.getWorker(); /* * Address of PowerJob-server node(s). Do not mistake for ActorSystem port. Do not add * any prefix, i.e. http://. */ CommonUtils.requireNonNull(worker.getServerAddress(), "serverAddress can't be empty! " + "if you don't want to enable powerjob, please config program arguments: powerjob.worker.enabled=false"); List<String> serverAddress = Arrays.asList(worker.getServerAddress().split(",")); /* * Create OhMyConfig object for setting properties. */ PowerJobWorkerConfig config = new PowerJobWorkerConfig(); /* * Configuration of worker port. Random port is enabled when port is set with non-positive number. */ if (worker.getPort() != null) { config.setPort(worker.getPort()); } else { int port = worker.getAkkaPort(); if (port <= 0) { port = NetUtils.getRandomPort(); } config.setPort(port); } /* * appName, name of the application. Applications should be registered in advance to prevent * error. This property should be the same with what you entered for appName when getting * registered. */ config.setAppName(worker.getAppName()); config.setServerAddress(serverAddress); config.setProtocol(worker.getProtocol()); /* * For non-Map/MapReduce tasks, {@code memory} is recommended for speeding up calculation. * Map/MapReduce tasks may produce batches of subtasks, which could lead to OutOfMemory * exception or error, {@code disk} should be applied. */ config.setStoreStrategy(worker.getStoreStrategy()); /* * When enabledTestMode is set as true, PowerJob-worker no longer connects to PowerJob-server * or validate appName. */ config.setEnableTestMode(worker.isEnableTestMode()); /* * Max length of appended workflow context . Appended workflow context value that is longer than the value will be ignored. */ config.setMaxAppendedWfContextLength(worker.getMaxAppendedWfContextLength()); config.setTag(worker.getTag()); config.setMaxHeavyweightTaskNum(worker.getMaxHeavyweightTaskNum()); config.setMaxLightweightTaskNum(worker.getMaxLightweightTaskNum()); config.setHealthReportInterval(worker.getHealthReportInterval()); /* * Create PowerJobSpringWorker object and set properties. */ return new PowerJobSpringWorker(config); } }
PowerJobAutoConfiguration開啟了PowerJobProperties,并且會自動配置,除非powerjob.worker.enabled設(shè)置為false,之后它配置了PowerJobSpringWorker,這里用initPowerJob這個命名不太好,因為這樣子會變成bean的名稱是initPowerJob;initPowerJob方法主要是將PowerJobProperties.Worker配置轉(zhuǎn)換為PowerJobWorkerConfig,在port小于等于0時支持隨機port;最后根據(jù)PowerJobWorkerConfig創(chuàng)建PowerJobSpringWorker
PowerJobSpringWorker
tech/powerjob/worker/PowerJobSpringWorker.java
public class PowerJobSpringWorker implements ApplicationContextAware, InitializingBean, DisposableBean { /** * 組合優(yōu)于繼承,持有 PowerJobWorker,內(nèi)部重新設(shè)置 ProcessorFactory 更優(yōu)雅 */ private PowerJobWorker powerJobWorker; private final PowerJobWorkerConfig config; public PowerJobSpringWorker(PowerJobWorkerConfig config) { this.config = config; } @Override public void afterPropertiesSet() throws Exception { powerJobWorker = new PowerJobWorker(config); powerJobWorker.init(); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { BuiltInSpringProcessorFactory springProcessorFactory = new BuiltInSpringProcessorFactory(applicationContext); BuildInSpringMethodProcessorFactory springMethodProcessorFactory = new BuildInSpringMethodProcessorFactory(applicationContext); // append BuiltInSpringProcessorFactory List<ProcessorFactory> processorFactories = Lists.newArrayList( Optional.ofNullable(config.getProcessorFactoryList()) .orElse(Collections.emptyList())); processorFactories.add(springProcessorFactory); processorFactories.add(springMethodProcessorFactory); config.setProcessorFactoryList(processorFactories); } @Override public void destroy() throws Exception { powerJobWorker.destroy(); } }
PowerJobSpringWorker實現(xiàn)了ApplicationContextAware、InitializingBean、DisposableBean接口;其afterPropertiesSet方法創(chuàng)建PowerJobWorker并執(zhí)行init方法,其destroy方法執(zhí)行powerJobWorker.destroy();其setApplicationContext方法主要是創(chuàng)建processorFactories,把springProcessorFactory、springMethodProcessorFactory添加到processorFactories,最后將processorFactories設(shè)置到config中
PowerJobWorker
tech/powerjob/worker/PowerJobWorker.java
@Slf4j public class PowerJobWorker { private final RemoteEngine remoteEngine; protected final WorkerRuntime workerRuntime; private final AtomicBoolean initialized = new AtomicBoolean(false); public PowerJobWorker(PowerJobWorkerConfig config) { this.workerRuntime = new WorkerRuntime(); this.remoteEngine = new PowerJobRemoteEngine(); workerRuntime.setWorkerConfig(config); } //...... }
PowerJobWorker定義了remoteEngine、workerRuntime、initialized屬性,構(gòu)造方法將PowerJobWorkerConfig設(shè)置到workerRuntime中
init
public void init() throws Exception { if (!initialized.compareAndSet(false, true)) { log.warn("[PowerJobWorker] please do not repeat the initialization"); return; } Stopwatch stopwatch = Stopwatch.createStarted(); log.info("[PowerJobWorker] start to initialize PowerJobWorker..."); PowerJobWorkerConfig config = workerRuntime.getWorkerConfig(); CommonUtils.requireNonNull(config, "can't find PowerJobWorkerConfig, please set PowerJobWorkerConfig first"); try { PowerBannerPrinter.print(); // 校驗 appName if (!config.isEnableTestMode()) { assertAppName(); } else { log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env."); } // 初始化元數(shù)據(jù) String workerAddress = NetUtils.getLocalHost() + ":" + config.getPort(); workerRuntime.setWorkerAddress(workerAddress); // 初始化 線程池 final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig()); workerRuntime.setExecutorManager(executorManager); // 初始化 ProcessorLoader ProcessorLoader processorLoader = buildProcessorLoader(workerRuntime); workerRuntime.setProcessorLoader(processorLoader); // 初始化 actor TaskTrackerActor taskTrackerActor = new TaskTrackerActor(workerRuntime); ProcessorTrackerActor processorTrackerActor = new ProcessorTrackerActor(workerRuntime); WorkerActor workerActor = new WorkerActor(workerRuntime, taskTrackerActor); // 初始化通訊引擎 EngineConfig engineConfig = new EngineConfig() .setType(config.getProtocol().name()) .setServerType(ServerType.WORKER) .setBindAddress(new Address().setHost(NetUtils.getLocalHost()).setPort(config.getPort())) .setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor)); EngineOutput engineOutput = remoteEngine.start(engineConfig); workerRuntime.setTransporter(engineOutput.getTransporter()); // 連接 server ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig()); serverDiscoveryService.start(workerRuntime.getExecutorManager().getCoreExecutor()); workerRuntime.setServerDiscoveryService(serverDiscoveryService); log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully."); // 初始化日志系統(tǒng) OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, workerRuntime.getTransporter(), serverDiscoveryService); workerRuntime.setOmsLogHandler(omsLogHandler); // 初始化存儲 TaskPersistenceService taskPersistenceService = new TaskPersistenceService(workerRuntime.getWorkerConfig().getStoreStrategy()); taskPersistenceService.init(); workerRuntime.setTaskPersistenceService(taskPersistenceService); log.info("[PowerJobWorker] local storage initialized successfully."); // 初始化定時任務(wù) workerRuntime.getExecutorManager().getCoreExecutor().scheduleAtFixedRate(new WorkerHealthReporter(workerRuntime), 0, config.getHealthReportInterval(), TimeUnit.SECONDS); workerRuntime.getExecutorManager().getCoreExecutor().scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS); log.info("[PowerJobWorker] PowerJobWorker initialized successfully, using time: {}, congratulations!", stopwatch); }catch (Exception e) { log.error("[PowerJobWorker] initialize PowerJobWorker failed, using {}.", stopwatch, e); throw e; } }
init方法先通過PowerBannerPrinter.print()打印banner,對于非testMode會執(zhí)行assertAppName校驗,之后就是設(shè)置workerRuntime的workerAddress、executorManager、processorLoader、workerActor、processorTrackerActor,執(zhí)行remoteEngine.start(engineConfig)、serverDiscoveryService.start、設(shè)置omsLogHandler、初始化taskPersistenceService、調(diào)度WorkerHealthReporter及l(fā)ogSubmitter
assertAppName
private void assertAppName() { PowerJobWorkerConfig config = workerRuntime.getWorkerConfig(); String appName = config.getAppName(); Objects.requireNonNull(appName, "appName can't be empty!"); String url = "http://%s/server/assert?appName=%s"; for (String server : config.getServerAddress()) { String realUrl = String.format(url, server, appName); try { String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl)); ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class); if (resultDTO.isSuccess()) { Long appId = Long.valueOf(resultDTO.getData().toString()); log.info("[PowerJobWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId); workerRuntime.setAppId(appId); return; }else { log.error("[PowerJobWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName); throw new PowerJobException(resultDTO.getMessage()); } }catch (PowerJobException oe) { throw oe; }catch (Exception ignore) { log.warn("[PowerJobWorker] assert appName by url({}) failed, please check the server address.", realUrl); } } log.error("[PowerJobWorker] no available server in {}.", config.getServerAddress()); throw new PowerJobException("no server available!"); }
assertAppName方法,主要是遍歷server,調(diào)用http://%s/server/assert?appName=%s,根據(jù)appName或者appId,然后設(shè)置到workerRuntime,有一個成功則立即返回
buildProcessorLoader
private ProcessorLoader buildProcessorLoader(WorkerRuntime runtime) { List<ProcessorFactory> customPF = Optional.ofNullable(runtime.getWorkerConfig().getProcessorFactoryList()).orElse(Collections.emptyList()); List<ProcessorFactory> finalPF = Lists.newArrayList(customPF); // 后置添加2個系統(tǒng) ProcessorLoader finalPF.add(new BuiltInDefaultProcessorFactory()); finalPF.add(new JarContainerProcessorFactory(runtime)); return new PowerJobProcessorLoader(finalPF); }
buildProcessorLoader在原來的processorFactoryList基礎(chǔ)(BuiltInSpringProcessorFactory、BuildInSpringMethodProcessorFactory)上添加了BuiltInDefaultProcessorFactory、JarContainerProcessorFactory
close
public void destroy() throws Exception { workerRuntime.getExecutorManager().shutdown(); remoteEngine.close(); }
close方法主要是執(zhí)行workerRuntime.getExecutorManager().shutdown()及remoteEngine.close()
小結(jié)
PowerJobAutoConfiguration主要是依據(jù)PowerJobProperties.Worker配置去創(chuàng)建PowerJobSpringWorker,而PowerJobSpringWorker則是將PowerJobWorker納入到spring容器中,其setApplicationContext方法主要是將BuiltInSpringProcessorFactory、BuildInSpringMethodProcessorFactory添加到config的processorFactoryList;其init主要是校驗appName、初始化線程池、ProcessorLoader、actor、remoteEngine、serverDiscoveryService、omsLogHandler、taskPersistenceService、調(diào)度WorkerHealthReporter及l(fā)ogSubmitter。
以上就是PowerJobAutoConfiguration自動配置源碼流程解析的詳細內(nèi)容,更多關(guān)于PowerJobAutoConfiguration自動配置的資料請關(guān)注腳本之家其它相關(guān)文章!
- PowerJob LockService方法工作流程源碼解讀
- PowerJob的Evaluator方法工作流程源碼解讀
- PowerJob的DatabaseMonitorAspect源碼流程
- PowerJob的AbstractScriptProcessor實現(xiàn)類工作流程源碼解讀
- PowerJob的WorkerHealthReporter工作流程源碼解讀
- PowerJob的OmsLogHandler工作流程源碼解析
- PowerJob的ServerDiscoveryService工作流程源碼解讀
- PowerJob的ProcessorLoader工作流程源碼解讀
- PowerJob的IdGenerateService工作流程源碼解讀
相關(guān)文章
Java學(xué)習(xí)之反射機制及應(yīng)用場景介紹
本篇文章主要介紹了Java反射機制及應(yīng)用場景,反射機制是很多Java框架的基石。非常具有實用價值,需要的朋友可以參考下。2016-11-11簡單說明Java的Struts框架中merge標簽的使用方法
這篇文章主要簡單介紹了Java的Struts框架中merge標簽的使用方法,Struts是Java的SSH三大web開發(fā)框架之一,需要的朋友可以參考下2015-12-12eclipse部署tomcat服務(wù)器無法啟動問題的解決方法
這篇文章主要為大家詳細介紹了eclipse部署tomcat服務(wù)器無法啟動問題的解決方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-03-03SpringDataJpa的使用之一對一、一對多、多對多?關(guān)系映射問題
這篇文章主要介紹了SpringDataJpa的使用?--?一對一、一對多、多對多關(guān)系映射,本文主要講述?@OneToOne、@OneToMany、@ManyToOne、@ManyToMany?這四個關(guān)系映射注解的使用,以及其對應(yīng)的級聯(lián)關(guān)系,需要的朋友可以參考下2022-07-07深度對比與解析SpringBoot中的application.properties與application.yml
在Springboot項目中,使用.properties和.yml配置是等效的,均可以正常識別并使用,本文將為大家深入對比與解析一下二者的使用與區(qū)別,希望對大家有一定的幫助2025-04-04