欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PowerJobAutoConfiguration自動配置源碼流程解析

 更新時間:2023年12月21日 16:44:54   作者:codecraft  
這篇文章主要為大家介紹了PowerJobAutoConfiguration自動配置源碼流程解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

本文主要研究一下PowerJobAutoConfiguration

PowerJobProperties

tech/powerjob/worker/autoconfigure/PowerJobProperties.java

@ConfigurationProperties(prefix = "powerjob")
public class PowerJobProperties {
    private final Worker worker = new Worker();
    public Worker getWorker() {
        return worker;
    }
    //......
}
PowerJobProperties的配置前綴為powerjob,主要的配置都在worker上

Worker

/**
     * Powerjob worker configuration properties.
     */
    @Setter
    @Getter
    public static class Worker {
        /**
         * Whether to enable PowerJob Worker
         */
        private boolean enabled = true;
        /**
         * Name of application, String type. Total length of this property should be no more than 255
         * characters. This is one of the required properties when registering a new application. This
         * property should be assigned with the same value as what you entered for the appName.
         */
        private String appName;
        /**
         * Akka port of Powerjob-worker, optional value. Default value of this property is 27777.
         * If multiple PowerJob-worker nodes were deployed, different, unique ports should be assigned.
         * Deprecated, please use 'port'
         */
        @Deprecated
        private int akkaPort = RemoteConstant.DEFAULT_WORKER_PORT;
        /**
         * port
         */
        private Integer port;
        /**
         * Address(es) of Powerjob-server node(s). Ip:port or domain.
         * Example of single Powerjob-server node:
         * <p>
         * 127.0.0.1:7700
         * </p>
         * Example of Powerjob-server cluster:
         * <p>
         * 192.168.0.10:7700,192.168.0.11:7700,192.168.0.12:7700
         * </p>
         */
        private String serverAddress;
        /**
         * Protocol for communication between WORKER and server
         */
        private Protocol protocol = Protocol.AKKA;
        /**
         * Local store strategy for H2 database. {@code disk} or {@code memory}.
         */
        private StoreStrategy storeStrategy = StoreStrategy.DISK;
        /**
         * Max length of response result. Result that is longer than the value will be truncated.
         * {@link ProcessResult} max length for #msg
         */
        private int maxResultLength = 8192;
        /**
         * If test mode is set as true, Powerjob-worker no longer connects to the server or validates appName.
         * Test mode is used for conditions that your have no powerjob-server in your develop env, so you can't start up the application
         */
        private boolean enableTestMode = false;
        /**
         * Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignored.
         * {@link WorkflowContext} max length for #appendedContextData
         */
        private int maxAppendedWfContextLength = 8192;
        private String tag;
        /**
         * Max numbers of LightTaskTacker
         */
        private Integer maxLightweightTaskNum = 1024;
        /**
         * Max numbers of HeavyTaskTacker
         */
        private Integer maxHeavyweightTaskNum = 64;
        /**
         * Interval(s) of worker health report
         */
        private Integer healthReportInterval = 10;
    }

 Worker定義了enabled、appName、port(默認(rèn)27777)、serverAddress(支持多個ip:port用逗號分隔)、protocol(默認(rèn)為akka,也支持http)、storeStrategy(默認(rèn)為disk,也支持memory,主要是配置H2數(shù)據(jù)庫的存儲模式)、maxResultLength(返回結(jié)果的最大長度,默認(rèn)為8192)、enableTestMode(默認(rèn)為false,主要用于沒有部署server的場景下進(jìn)行調(diào)試)、maxAppendedWfContextLength(默認(rèn)為8192)、tag、maxLightweightTaskNum(默認(rèn)為1024)、maxHeavyweightTaskNum(默認(rèn)為64)、healthReportInterval(默認(rèn)為10s)

PowerJobWorkerConfig

tech/powerjob/worker/common/PowerJobWorkerConfig.java

@Getter
@Setter
public class PowerJobWorkerConfig {
    /**
     * AppName, recommend to use the name of this project
     * Applications should be registered by powerjob-console in advance to prevent error.
     */
    private String appName;
    /**
     * Worker port
     * Random port is enabled when port is set with non-positive number.
     */
    private int port = RemoteConstant.DEFAULT_WORKER_PORT;
    /**
     * Address of powerjob-server node(s)
     * Do not mistake for ActorSystem port. Do not add any prefix, i.e. http://.
     */
    private List<String> serverAddress = Lists.newArrayList();
    /**
     * Protocol for communication between WORKER and server
     */
    private Protocol protocol = Protocol.AKKA;
    /**
     * Max length of response result. Result that is longer than the value will be truncated.
     * {@link ProcessResult} max length for #msg
     */
    private int maxResultLength = 8096;
    /**
     * User-defined context object, which is passed through to the TaskContext#userContext property
     * Usage Scenarios: The container Java processor needs to use the Spring bean of the host application, where you can pass in the ApplicationContext and get the bean in the Processor
     */
    private Object userContext;
    /**
     * Internal persistence method, DISK or MEMORY
     * Normally you don't need to care about this configuration
     */
    private StoreStrategy storeStrategy = StoreStrategy.DISK;
    /**
     * If test mode is set as true, Powerjob-worker no longer connects to the server or validates appName.
     * Test mode is used for conditions that your have no powerjob-server in your develop env so you can't startup the application
     */
    private boolean enableTestMode = false;
    /**
     * Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore.
     * {@link WorkflowContext} max length for #appendedContextData
     */
    private int maxAppendedWfContextLength = 8192;
    /**
     * user-customized system metrics collector
     */
    private SystemMetricsCollector systemMetricsCollector;
    /**
     * Processor factory for custom logic, generally used for IOC framework processor bean injection that is not officially supported by PowerJob
     */
    private List<ProcessorFactory> processorFactoryList;

    private String tag;
    /**
     * Max numbers of LightTaskTacker
     */
    private Integer maxLightweightTaskNum = 1024;
    /**
     * Max numbers of HeavyTaskTacker
     */
    private Integer maxHeavyweightTaskNum = 64;
    /**
     * Interval(s) of worker health report
     */
    private Integer healthReportInterval = 10;

}
PowerJobWorkerConfig配置基本與PowerJobProperties.Worker配置相同

PowerJobAutoConfiguration

tech/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java

@Configuration
@EnableConfigurationProperties(PowerJobProperties.class)
@ConditionalOnProperty(prefix = "powerjob.worker", name = "enabled", havingValue = "true", matchIfMissing = true)
public class PowerJobAutoConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public PowerJobSpringWorker initPowerJob(PowerJobProperties properties) {
        PowerJobProperties.Worker worker = properties.getWorker();
        /*
         * Address of PowerJob-server node(s). Do not mistake for ActorSystem port. Do not add
         * any prefix, i.e. http://.
         */
        CommonUtils.requireNonNull(worker.getServerAddress(), "serverAddress can't be empty! " +
            "if you don't want to enable powerjob, please config program arguments: powerjob.worker.enabled=false");
        List<String> serverAddress = Arrays.asList(worker.getServerAddress().split(","));
        /*
         * Create OhMyConfig object for setting properties.
         */
        PowerJobWorkerConfig config = new PowerJobWorkerConfig();
        /*
         * Configuration of worker port. Random port is enabled when port is set with non-positive number.
         */
        if (worker.getPort() != null) {
            config.setPort(worker.getPort());
        } else {
            int port = worker.getAkkaPort();
            if (port <= 0) {
                port = NetUtils.getRandomPort();
            }
            config.setPort(port);
        }
        /*
         * appName, name of the application. Applications should be registered in advance to prevent
         * error. This property should be the same with what you entered for appName when getting
         * registered.
         */
        config.setAppName(worker.getAppName());
        config.setServerAddress(serverAddress);
        config.setProtocol(worker.getProtocol());
        /*
         * For non-Map/MapReduce tasks, {@code memory} is recommended for speeding up calculation.
         * Map/MapReduce tasks may produce batches of subtasks, which could lead to OutOfMemory
         * exception or error, {@code disk} should be applied.
         */
        config.setStoreStrategy(worker.getStoreStrategy());
        /*
         * When enabledTestMode is set as true, PowerJob-worker no longer connects to PowerJob-server
         * or validate appName.
         */
        config.setEnableTestMode(worker.isEnableTestMode());
        /*
         * Max length of appended workflow context . Appended workflow context value that is longer than the value will be ignored.
         */
        config.setMaxAppendedWfContextLength(worker.getMaxAppendedWfContextLength());
        config.setTag(worker.getTag());
        config.setMaxHeavyweightTaskNum(worker.getMaxHeavyweightTaskNum());
        config.setMaxLightweightTaskNum(worker.getMaxLightweightTaskNum());
        config.setHealthReportInterval(worker.getHealthReportInterval());
        /*
         * Create PowerJobSpringWorker object and set properties.
         */
        return new PowerJobSpringWorker(config);
    }
}
PowerJobAutoConfiguration開啟了PowerJobProperties,并且會自動配置,除非powerjob.worker.enabled設(shè)置為false,之后它配置了PowerJobSpringWorker,這里用initPowerJob這個命名不太好,因?yàn)檫@樣子會變成bean的名稱是initPowerJob;initPowerJob方法主要是將PowerJobProperties.Worker配置轉(zhuǎn)換為PowerJobWorkerConfig,在port小于等于0時支持隨機(jī)port;最后根據(jù)PowerJobWorkerConfig創(chuàng)建PowerJobSpringWorker

PowerJobSpringWorker

tech/powerjob/worker/PowerJobSpringWorker.java

public class PowerJobSpringWorker implements ApplicationContextAware, InitializingBean, DisposableBean {
    /**
     * 組合優(yōu)于繼承,持有 PowerJobWorker,內(nèi)部重新設(shè)置 ProcessorFactory 更優(yōu)雅
     */
    private PowerJobWorker powerJobWorker;
    private final PowerJobWorkerConfig config;
    public PowerJobSpringWorker(PowerJobWorkerConfig config) {
        this.config = config;
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        powerJobWorker = new PowerJobWorker(config);
        powerJobWorker.init();
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        BuiltInSpringProcessorFactory springProcessorFactory = new BuiltInSpringProcessorFactory(applicationContext);
        BuildInSpringMethodProcessorFactory springMethodProcessorFactory = new BuildInSpringMethodProcessorFactory(applicationContext);
        // append BuiltInSpringProcessorFactory
        List<ProcessorFactory> processorFactories = Lists.newArrayList(
                Optional.ofNullable(config.getProcessorFactoryList())
                        .orElse(Collections.emptyList()));
        processorFactories.add(springProcessorFactory);
        processorFactories.add(springMethodProcessorFactory);
        config.setProcessorFactoryList(processorFactories);
    }
    @Override
    public void destroy() throws Exception {
        powerJobWorker.destroy();
    }
}
PowerJobSpringWorker實(shí)現(xiàn)了ApplicationContextAware、InitializingBean、DisposableBean接口;其afterPropertiesSet方法創(chuàng)建PowerJobWorker并執(zhí)行init方法,其destroy方法執(zhí)行powerJobWorker.destroy();其setApplicationContext方法主要是創(chuàng)建processorFactories,把springProcessorFactory、springMethodProcessorFactory添加到processorFactories,最后將processorFactories設(shè)置到config中

PowerJobWorker

tech/powerjob/worker/PowerJobWorker.java

@Slf4j
public class PowerJobWorker {
    private final RemoteEngine remoteEngine;
    protected final WorkerRuntime workerRuntime;
    private final AtomicBoolean initialized = new AtomicBoolean(false);

    public PowerJobWorker(PowerJobWorkerConfig config) {
        this.workerRuntime = new WorkerRuntime();
        this.remoteEngine = new PowerJobRemoteEngine();
        workerRuntime.setWorkerConfig(config);
    }

    //......
}
PowerJobWorker定義了remoteEngine、workerRuntime、initialized屬性,構(gòu)造方法將PowerJobWorkerConfig設(shè)置到workerRuntime中

init

public void init() throws Exception {
        if (!initialized.compareAndSet(false, true)) {
            log.warn("[PowerJobWorker] please do not repeat the initialization");
            return;
        }
        Stopwatch stopwatch = Stopwatch.createStarted();
        log.info("[PowerJobWorker] start to initialize PowerJobWorker...");
        PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
        CommonUtils.requireNonNull(config, "can't find PowerJobWorkerConfig, please set PowerJobWorkerConfig first");
        try {
            PowerBannerPrinter.print();
            // 校驗(yàn) appName
            if (!config.isEnableTestMode()) {
                assertAppName();
            } else {
                log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env.");
            }
            // 初始化元數(shù)據(jù)
            String workerAddress = NetUtils.getLocalHost() + ":" + config.getPort();
            workerRuntime.setWorkerAddress(workerAddress);
            // 初始化 線程池
            final ExecutorManager executorManager = new ExecutorManager(workerRuntime.getWorkerConfig());
            workerRuntime.setExecutorManager(executorManager);
            // 初始化 ProcessorLoader
            ProcessorLoader processorLoader = buildProcessorLoader(workerRuntime);
            workerRuntime.setProcessorLoader(processorLoader);
            // 初始化 actor
            TaskTrackerActor taskTrackerActor = new TaskTrackerActor(workerRuntime);
            ProcessorTrackerActor processorTrackerActor = new ProcessorTrackerActor(workerRuntime);
            WorkerActor workerActor = new WorkerActor(workerRuntime, taskTrackerActor);
            // 初始化通訊引擎
            EngineConfig engineConfig = new EngineConfig()
                    .setType(config.getProtocol().name())
                    .setServerType(ServerType.WORKER)
                    .setBindAddress(new Address().setHost(NetUtils.getLocalHost()).setPort(config.getPort()))
                    .setActorList(Lists.newArrayList(taskTrackerActor, processorTrackerActor, workerActor));
            EngineOutput engineOutput = remoteEngine.start(engineConfig);
            workerRuntime.setTransporter(engineOutput.getTransporter());
            // 連接 server
            ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig());
            serverDiscoveryService.start(workerRuntime.getExecutorManager().getCoreExecutor());
            workerRuntime.setServerDiscoveryService(serverDiscoveryService);
            log.info("[PowerJobWorker] PowerJobRemoteEngine initialized successfully.");
            // 初始化日志系統(tǒng)
            OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, workerRuntime.getTransporter(), serverDiscoveryService);
            workerRuntime.setOmsLogHandler(omsLogHandler);
            // 初始化存儲
            TaskPersistenceService taskPersistenceService = new TaskPersistenceService(workerRuntime.getWorkerConfig().getStoreStrategy());
            taskPersistenceService.init();
            workerRuntime.setTaskPersistenceService(taskPersistenceService);
            log.info("[PowerJobWorker] local storage initialized successfully.");
            // 初始化定時任務(wù)
            workerRuntime.getExecutorManager().getCoreExecutor().scheduleAtFixedRate(new WorkerHealthReporter(workerRuntime), 0, config.getHealthReportInterval(), TimeUnit.SECONDS);
            workerRuntime.getExecutorManager().getCoreExecutor().scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);
            log.info("[PowerJobWorker] PowerJobWorker initialized successfully, using time: {}, congratulations!", stopwatch);
        }catch (Exception e) {
            log.error("[PowerJobWorker] initialize PowerJobWorker failed, using {}.", stopwatch, e);
            throw e;
        }
    }
init方法先通過PowerBannerPrinter.print()打印banner,對于非testMode會執(zhí)行assertAppName校驗(yàn),之后就是設(shè)置workerRuntime的workerAddress、executorManager、processorLoader、workerActor、processorTrackerActor,執(zhí)行remoteEngine.start(engineConfig)、serverDiscoveryService.start、設(shè)置omsLogHandler、初始化taskPersistenceService、調(diào)度WorkerHealthReporter及l(fā)ogSubmitter

assertAppName

private void assertAppName() {

        PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
        String appName = config.getAppName();
        Objects.requireNonNull(appName, "appName can't be empty!");

        String url = "http://%s/server/assert?appName=%s";
        for (String server : config.getServerAddress()) {
            String realUrl = String.format(url, server, appName);
            try {
                String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
                ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class);
                if (resultDTO.isSuccess()) {
                    Long appId = Long.valueOf(resultDTO.getData().toString());
                    log.info("[PowerJobWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId);
                    workerRuntime.setAppId(appId);
                    return;
                }else {
                    log.error("[PowerJobWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
                    throw new PowerJobException(resultDTO.getMessage());
                }
            }catch (PowerJobException oe) {
                throw oe;
            }catch (Exception ignore) {
                log.warn("[PowerJobWorker] assert appName by url({}) failed, please check the server address.", realUrl);
            }
        }
        log.error("[PowerJobWorker] no available server in {}.", config.getServerAddress());
        throw new PowerJobException("no server available!");
    }
assertAppName方法,主要是遍歷server,調(diào)用http://%s/server/assert?appName=%s,根據(jù)appName或者appId,然后設(shè)置到workerRuntime,有一個成功則立即返回

buildProcessorLoader

private ProcessorLoader buildProcessorLoader(WorkerRuntime runtime) {
        List<ProcessorFactory> customPF = Optional.ofNullable(runtime.getWorkerConfig().getProcessorFactoryList()).orElse(Collections.emptyList());
        List<ProcessorFactory> finalPF = Lists.newArrayList(customPF);

        // 后置添加2個系統(tǒng) ProcessorLoader
        finalPF.add(new BuiltInDefaultProcessorFactory());
        finalPF.add(new JarContainerProcessorFactory(runtime));

        return new PowerJobProcessorLoader(finalPF);
    }
buildProcessorLoader在原來的processorFactoryList基礎(chǔ)(BuiltInSpringProcessorFactory、BuildInSpringMethodProcessorFactory)上添加了BuiltInDefaultProcessorFactory、JarContainerProcessorFactory

close

public void destroy() throws Exception {
        workerRuntime.getExecutorManager().shutdown();
        remoteEngine.close();
    }
close方法主要是執(zhí)行workerRuntime.getExecutorManager().shutdown()及remoteEngine.close()

小結(jié)

PowerJobAutoConfiguration主要是依據(jù)PowerJobProperties.Worker配置去創(chuàng)建PowerJobSpringWorker,而PowerJobSpringWorker則是將PowerJobWorker納入到spring容器中,其setApplicationContext方法主要是將BuiltInSpringProcessorFactory、BuildInSpringMethodProcessorFactory添加到config的processorFactoryList;其init主要是校驗(yàn)appName、初始化線程池、ProcessorLoader、actor、remoteEngine、serverDiscoveryService、omsLogHandler、taskPersistenceService、調(diào)度WorkerHealthReporter及l(fā)ogSubmitter。

以上就是PowerJobAutoConfiguration自動配置源碼流程解析的詳細(xì)內(nèi)容,更多關(guān)于PowerJobAutoConfiguration自動配置的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java關(guān)于重排鏈表詳細(xì)解析

    Java關(guān)于重排鏈表詳細(xì)解析

    在我們平時的代碼過程中,鏈表是我們經(jīng)常遇到的一個數(shù)據(jù)結(jié)構(gòu),面試題中鏈表占很大一部分,可見鏈表操作是非常重要的。本篇文章我們來探究一下如何進(jìn)行重排鏈表
    2022-01-01
  • Spring?Batch實(shí)現(xiàn)批量處理

    Spring?Batch實(shí)現(xiàn)批量處理

    本文主要介紹了Spring?Batch進(jìn)行批量處理,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-06-06
  • eclipse部署tomcat服務(wù)器無法啟動問題的解決方法

    eclipse部署tomcat服務(wù)器無法啟動問題的解決方法

    這篇文章主要為大家詳細(xì)介紹了eclipse部署tomcat服務(wù)器無法啟動問題的解決方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-03-03
  • SpringDataJpa的使用之一對一、一對多、多對多?關(guān)系映射問題

    SpringDataJpa的使用之一對一、一對多、多對多?關(guān)系映射問題

    這篇文章主要介紹了SpringDataJpa的使用?--?一對一、一對多、多對多關(guān)系映射,本文主要講述?@OneToOne、@OneToMany、@ManyToOne、@ManyToMany?這四個關(guān)系映射注解的使用,以及其對應(yīng)的級聯(lián)關(guān)系,需要的朋友可以參考下
    2022-07-07
  • 深度對比與解析SpringBoot中的application.properties與application.yml

    深度對比與解析SpringBoot中的application.properties與application.yml

    在Springboot項(xiàng)目中,使用.properties和.yml配置是等效的,均可以正常識別并使用,本文將為大家深入對比與解析一下二者的使用與區(qū)別,希望對大家有一定的幫助
    2025-04-04
  • java Hibernate延遲加載

    java Hibernate延遲加載

    對one-to-one 關(guān)系進(jìn)行延遲加載和其他關(guān)系相比稍微有些不同。many-to-one 的延遲加載是在配置文件的class 標(biāo)簽
    2008-10-10
  • 最新評論