PowerJob的ProcessorLoader工作流程源碼解讀
序
本文主要研究一下PowerJob的ProcessorLoader
ProcessorLoader
tech/powerjob/worker/processor/ProcessorLoader.java
public interface ProcessorLoader {
ProcessorBean load(ProcessorDefinition definition);
}ProcessorLoader定義了load方法,用于根據(jù)ProcessorDefinition加載ProcessorBean
ProcessorDefinition
tech/powerjob/worker/extension/processor/ProcessorDefinition.java
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class ProcessorDefinition implements Serializable {
/**
* 后臺(tái)配置的處理器類型
*/
private String processorType;
/**
* 后臺(tái)配置的處理器信息
*/
private String processorInfo;
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ProcessorDefinition that = (ProcessorDefinition) o;
return Objects.equals(processorType, that.processorType) && Objects.equals(processorInfo, that.processorInfo);
}
@Override
public int hashCode() {
return Objects.hash(processorType, processorInfo);
}
}ProcessorDefinition定義了processorType及processorInfo兩個(gè)屬性
ProcessorBean
tech/powerjob/worker/extension/processor/ProcessorBean.java
@Getter
@Setter
@Accessors(chain = true)
public class ProcessorBean {
/**
* 真正用來執(zhí)行邏輯的處理器對象
*/
private transient BasicProcessor processor;
/**
* 加載該處理器對象的 classLoader,可空,空則使用 {@link Object#getClass()#getClassLoader() 代替}
*/
private transient ClassLoader classLoader;
}ProcessorBean定義了BasicProcessor及ClassLoader兩個(gè)屬性
PowerJobProcessorLoader
tech/powerjob/worker/processor/PowerJobProcessorLoader.java
@Slf4j
public class PowerJobProcessorLoader implements ProcessorLoader {
private final List<ProcessorFactory> processorFactoryList;
private final Map<ProcessorDefinition, ProcessorBean> def2Bean = new ConcurrentHashMap<>(128);
public PowerJobProcessorLoader(List<ProcessorFactory> processorFactoryList) {
this.processorFactoryList = processorFactoryList;
}
@Override
public ProcessorBean load(ProcessorDefinition definition) {
return def2Bean.computeIfAbsent(definition, ignore -> {
final String processorType = definition.getProcessorType();
log.info("[ProcessorFactory] start to load Processor: {}", definition);
for (ProcessorFactory pf : processorFactoryList) {
final String pfName = pf.getClass().getSimpleName();
if (!Optional.ofNullable(pf.supportTypes()).orElse(Collections.emptySet()).contains(processorType)) {
log.info("[ProcessorFactory] [{}] can't load type={}, skip!", pfName, processorType);
continue;
}
log.info("[ProcessorFactory] [{}] try to load processor: {}", pfName, definition);
try {
ProcessorBean processorBean = pf.build(definition);
if (processorBean != null) {
log.info("[ProcessorFactory] [{}] load processor successfully: {}", pfName, definition);
return processorBean;
}
} catch (Throwable t) {
log.error("[ProcessorFactory] [{}] load processor failed: {}", pfName, definition, t);
}
}
throw new PowerJobException("fetch Processor failed, please check your processorType and processorInfo config");
});
}
}PowerJobProcessorLoader實(shí)現(xiàn)了ProcessorLoader接口,其構(gòu)造器要求傳入processorFactoryList,它還定義了def2Bean,用于維護(hù)ProcessorDefinition與ProcessorBean的關(guān)系;其load方法使用ConcurrentHashMap的computeIfAbsent,將加載好的ProcessorBean放入到def2Bean;其加載過程為遍歷processorFactoryList,找到支持該processorType的ProcessorFactory,然后執(zhí)行其build方法進(jìn)行構(gòu)造
ProcessorFactory
tech/powerjob/worker/extension/processor/ProcessorFactory.java
public interface ProcessorFactory {
/**
* 支持的處理器類型,類型不匹配則跳過該 ProcessorFactory 的加載邏輯
* 對應(yīng)的是控制臺(tái)的'處理器類型' TAB,不做任何定制的情況下,取值范圍為 {@link ProcessorType#name()}
* @return 支持的處理器類型
*/
Set<String> supportTypes();
/**
* 根據(jù)處理器定義構(gòu)建處理器對象
* 注意:Processor 為單例對象,即 PowerJob 對每一個(gè) ProcessorBean 只調(diào)用一次 build 方法
* @param processorDefinition 處理器定義
* @return null or ProcessorBean
*/
ProcessorBean build(ProcessorDefinition processorDefinition);
}ProcessorFactory接口定義了supportTypes、build方法;它有四個(gè)實(shí)現(xiàn)類,其中BuiltInSpringProcessorFactory及BuildInSpringMethodProcessorFactory繼承自AbstractBuildInSpringProcessorFactory,另外兩個(gè)為BuiltInDefaultProcessorFactory、JarContainerProcessorFactory
BuiltInDefaultProcessorFactory
tech/powerjob/worker/processor/impl/BuiltInDefaultProcessorFactory.java
@Slf4j
public class BuiltInDefaultProcessorFactory implements ProcessorFactory {
@Override
public Set<String> supportTypes() {
return Sets.newHashSet(ProcessorType.BUILT_IN.name());
}
@Override
public ProcessorBean build(ProcessorDefinition processorDefinition) {
String className = processorDefinition.getProcessorInfo();
try {
Class<?> clz = Class.forName(className);
BasicProcessor basicProcessor = (BasicProcessor) clz.getDeclaredConstructor().newInstance();
return new ProcessorBean()
.setProcessor(basicProcessor)
.setClassLoader(basicProcessor.getClass().getClassLoader());
}catch (Exception e) {
log.warn("[ProcessorFactory] load local Processor(className = {}) failed.", className, e);
}
return null;
}
}BuiltInDefaultProcessorFactory是默認(rèn)的處理器工廠,通過全限定類名加載處理器,但沒有IOC功能
JarContainerProcessorFactory
tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java
@Slf4j
public class JarContainerProcessorFactory implements ProcessorFactory {
private final WorkerRuntime workerRuntime;
public JarContainerProcessorFactory(WorkerRuntime workerRuntime) {
this.workerRuntime = workerRuntime;
}
@Override
public Set<String> supportTypes() {
return Sets.newHashSet(ProcessorType.EXTERNAL.name());
}
@Override
public ProcessorBean build(ProcessorDefinition processorDefinition) {
String processorInfo = processorDefinition.getProcessorInfo();
String[] split = processorInfo.split("#");
String containerName = split[0];
String className = split[1];
log.info("[ProcessorFactory] try to load processor({}) in container({})", className, containerName);
OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(containerName), workerRuntime);
if (omsContainer != null) {
return new ProcessorBean()
.setProcessor(omsContainer.getProcessor(className))
.setClassLoader(omsContainer.getContainerClassLoader());
} else {
log.warn("[ProcessorFactory] load container failed. processor info : {}", processorInfo);
}
return null;
}
}JarContainerProcessorFactory主要是通過OmsContainer來加載ProcessorBean
AbstractBuildInSpringProcessorFactory
tech/powerjob/worker/processor/impl/AbstractBuildInSpringProcessorFactory.java
@Slf4j
public abstract class AbstractBuildInSpringProcessorFactory implements ProcessorFactory {
protected final ApplicationContext applicationContext;
protected AbstractBuildInSpringProcessorFactory(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
public Set<String> supportTypes() {
return Sets.newHashSet(ProcessorType.BUILT_IN.name());
}
protected boolean checkCanLoad() {
try {
ApplicationContext.class.getClassLoader();
return applicationContext != null;
} catch (Throwable ignore) {
}
return false;
}
@SuppressWarnings("unchecked")
protected static <T> T getBean(String className, ApplicationContext ctx) throws Exception {
// 0. 嘗試直接用 Bean 名稱加載
try {
final Object bean = ctx.getBean(className);
if (bean != null) {
return (T) bean;
}
} catch (Exception ignore) {
}
// 1. ClassLoader 存在,則直接使用 clz 加載
ClassLoader classLoader = ctx.getClassLoader();
if (classLoader != null) {
return (T) ctx.getBean(classLoader.loadClass(className));
}
// 2. ClassLoader 不存在(系統(tǒng)類加載器不可見),嘗試用類名稱小寫加載
String[] split = className.split("\\.");
String beanName = split[split.length - 1];
// 小寫轉(zhuǎn)大寫
char[] cs = beanName.toCharArray();
cs[0] += 32;
String beanName0 = String.valueOf(cs);
log.warn("[SpringUtils] can't get ClassLoader from context[{}], try to load by beanName:{}", ctx, beanName0);
return (T) ctx.getBean(beanName0);
}
}AbstractBuildInSpringProcessorFactory是兩個(gè)spring相關(guān)ProcessorFactory的抽象類,它使用spring的ApplicationContext來加載
BuiltInSpringProcessorFactory
tech/powerjob/worker/processor/impl/BuiltInSpringProcessorFactory.java
@Slf4j
public class BuiltInSpringProcessorFactory extends AbstractBuildInSpringProcessorFactory {
public BuiltInSpringProcessorFactory(ApplicationContext applicationContext) {
super(applicationContext);
}
@Override
public ProcessorBean build(ProcessorDefinition processorDefinition) {
try {
boolean canLoad = checkCanLoad();
if (!canLoad) {
log.info("[ProcessorFactory] can't find Spring env, this processor can't load by 'BuiltInSpringProcessorFactory'");
return null;
}
String processorInfo = processorDefinition.getProcessorInfo();
//用于區(qū)分方法級別的參數(shù)
if (processorInfo.contains("#")) {
return null;
}
BasicProcessor basicProcessor = getBean(processorInfo, applicationContext);
return new ProcessorBean()
.setProcessor(basicProcessor)
.setClassLoader(basicProcessor.getClass().getClassLoader());
} catch (NoSuchBeanDefinitionException ignore) {
log.warn("[ProcessorFactory] can't find the processor in SPRING");
} catch (Throwable t) {
log.warn("[ProcessorFactory] load by BuiltInSpringProcessorFactory failed. If you are using Spring, make sure this bean was managed by Spring", t);
}
return null;
}
}BuiltInSpringProcessorFactory通過ApplicationContext加載spring相關(guān)的Bean,但它不處理processorInfo包含#的processorDefinition
BuildInSpringMethodProcessorFactory
tech/powerjob/worker/processor/impl/BuildInSpringMethodProcessorFactory.java
@Slf4j
public class BuildInSpringMethodProcessorFactory extends AbstractBuildInSpringProcessorFactory {
private static final List<String> jobHandlerRepository = new LinkedList<>();
private final static String DELIMITER = "#";
public BuildInSpringMethodProcessorFactory(ApplicationContext applicationContext) {
super(applicationContext);
}
@Override
public ProcessorBean build(ProcessorDefinition processorDefinition) {
try {
boolean canLoad = checkCanLoad();
if (!canLoad) {
log.info("[ProcessorFactory] can't find Spring env, this processor can't load by 'BuildInSpringMethodProcessorFactory'");
return null;
}
String processorInfo = processorDefinition.getProcessorInfo();
if (!processorInfo.contains(DELIMITER)) {
log.info("[ProcessorFactory] can't parse processorDefinition, this processor can't load by 'BuildInSpringMethodProcessorFactory'");
return null;
}
String[] split = processorInfo.split(DELIMITER);
String methodName = split[1];
String className = split[0];
Object bean = getBean(className,applicationContext);
Method[] methods = bean.getClass().getDeclaredMethods();
for (Method method : methods) {
PowerJobHandler powerJob = method.getAnnotation(PowerJobHandler.class);
if (powerJob == null) {
continue;
}
String name = powerJob.name();
//匹配到和頁面定義相同的methodName
if (!name.equals(methodName)) {
continue;
}
if (name.trim().length() == 0) {
throw new RuntimeException("method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
if (containsJobHandler(name)) {
throw new RuntimeException("jobhandler[" + name + "] naming conflicts.");
}
method.setAccessible(true);
registerJobHandler(methodName);
MethodBasicProcessor processor = new MethodBasicProcessor(bean, method);
return new ProcessorBean()
.setProcessor(processor)
.setClassLoader(processor.getClass().getClassLoader());
}
} catch (NoSuchBeanDefinitionException ignore) {
log.warn("[ProcessorFactory] can't find the processor in SPRING");
} catch (Throwable t) {
log.warn("[ProcessorFactory] load by BuiltInSpringProcessorFactory failed. If you are using Spring, make sure this bean was managed by Spring", t);
}
return null;
}
public static void registerJobHandler(String name) {
jobHandlerRepository.add(name);
}
private boolean containsJobHandler(String name) {
return jobHandlerRepository.contains(name);
}
}BuildInSpringMethodProcessorFactory專門用于processorInfo包含#的processorDefinition,它會(huì)遍歷指定class的methods,找到方法上標(biāo)注有@PowerJobHandler注解且方法名一致的method,注冊到j(luò)obHandlerRepository,其創(chuàng)建的是MethodBasicProcessor
小結(jié)
PowerJob的ProcessorLoader定義了load方法,用于根據(jù)ProcessorDefinition加載ProcessorBean;PowerJobProcessorLoader實(shí)現(xiàn)了ProcessorLoader接口,它會(huì)遍歷processorFactoryList,找到支持該processorType的ProcessorFactory,然后執(zhí)行其build方法進(jìn)行構(gòu)造;ProcessorFactory接口定義了supportTypes、build方法;它有四個(gè)實(shí)現(xiàn)類,其中BuiltInSpringProcessorFactory及BuildInSpringMethodProcessorFactory繼承自AbstractBuildInSpringProcessorFactory,另外兩個(gè)為BuiltInDefaultProcessorFactory、JarContainerProcessorFactory。
以上就是PowerJob的ProcessorLoader的詳細(xì)內(nèi)容,更多關(guān)于PowerJob ProcessorLoader的資料請關(guān)注腳本之家其它相關(guān)文章!
- PowerJob的TimingStrategyHandler工作流程源碼解讀
- PowerJob的IdGenerateService工作流程源碼解讀
- PowerJob LockService方法工作流程源碼解讀
- PowerJob的Evaluator方法工作流程源碼解讀
- PowerJob的DatabaseMonitorAspect源碼流程
- PowerJob的AbstractScriptProcessor實(shí)現(xiàn)類工作流程源碼解讀
- PowerJob的WorkerHealthReporter工作流程源碼解讀
- PowerJob的OmsLogHandler工作流程源碼解析
- PowerJob的ServerDiscoveryService工作流程源碼解讀
- PowerJob的DispatchStrategy方法工作流程源碼解讀
相關(guān)文章
Java數(shù)據(jù)結(jié)構(gòu)之簡單鏈表的定義與實(shí)現(xiàn)方法示例
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)之簡單鏈表的定義與實(shí)現(xiàn)方法,簡單描述了鏈接的概念、原理,并結(jié)合實(shí)例形式分析了java定義與使用鏈表的相關(guān)步驟與操作技巧,需要的朋友可以參考下2017-10-10
Maven在Java8下如何忽略Javadoc的編譯錯(cuò)誤詳解
這篇文章主要給大家介紹了關(guān)于Maven在Java8下如何忽略Javadoc的編譯錯(cuò)誤的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-08-08
mybatis 使用jdbc.properties文件設(shè)置不起作用的解決方法
這篇文章主要介紹了mybatis 使用jdbc.properties文件設(shè)置不起作用的解決方法,需要的朋友可以參考下2018-03-03
Spring MVC簡介_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
Spring MVC屬于SpringFrameWork的后續(xù)產(chǎn)品,已經(jīng)融合在Spring Web Flow里面。今天先從寫一個(gè)Spring MVC的HelloWorld開始,讓我們看看如何搭建起一個(gè)Spring mvc的環(huán)境并運(yùn)行程序,感興趣的朋友一起學(xué)習(xí)吧2017-08-08
jackson序列化和反序列化的應(yīng)用實(shí)踐指南
這篇文章主要給大家介紹了關(guān)于jackson序列化和反序列化的應(yīng)用實(shí)踐指南,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09
Javas使用Redlock實(shí)現(xiàn)分布式鎖過程解析
這篇文章主要介紹了Javas使用Redlock實(shí)現(xiàn)分布式鎖過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-08-08
Spring?Boot整合Kafka+SSE實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)展示
本文主要介紹了Spring?Boot整合Kafka+SSE實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)展示2024-06-06

