PowerJob的DesignateServer工作流程源碼解讀
序
本文主要研究一下PowerJob的DesignateServer
DesignateServer
tech/powerjob/server/remote/server/redirector/DesignateServer.java
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface DesignateServer { /** * 轉(zhuǎn)發(fā)請求需要 AppInfo 下的 currentServer 信息,因此必須要有 appId 作為入?yún)ⅲ撟侄沃付?appId 字段的參數(shù)名稱,默認(rèn)為 appId * @return appId 參數(shù)名稱 */ String appIdParameterName() default "appId"; }
DesignateServer注解定義了appIdParameterName屬性,默認(rèn)是appId
DesignateServerAspect
tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java
@Slf4j @Aspect @Component @Order(0) @RequiredArgsConstructor public class DesignateServerAspect { private final TransportService transportService; private final AppInfoRepository appInfoRepository; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @Around(value = "@annotation(designateServer))") public Object execute(ProceedingJoinPoint point, DesignateServer designateServer) throws Throwable { // 參數(shù) Object[] args = point.getArgs(); // 方法名 String methodName = point.getSignature().getName(); // 類名 String className = point.getSignature().getDeclaringTypeName(); Signature signature = point.getSignature(); // 方法簽名 MethodSignature methodSignature = (MethodSignature) signature; String[] parameterNames = methodSignature.getParameterNames(); String[] parameterTypes = Arrays.stream(methodSignature.getParameterTypes()).map(Class::getName).toArray(String[]::new); Long appId = null; for (int i = 0; i < parameterNames.length; i++) { if (StringUtils.equals(parameterNames[i], designateServer.appIdParameterName())) { appId = Long.parseLong(String.valueOf(args[i])); break; } } if (appId == null) { throw new PowerJobException("can't find appId in params for:" + signature); } // 獲取執(zhí)行機(jī)器 AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new PowerJobException("can't find app info")); String targetServer = appInfo.getCurrentServer(); // 目標(biāo)IP為空,本地執(zhí)行 if (StringUtils.isEmpty(targetServer)) { return point.proceed(); } // 目標(biāo)IP與本地符合則本地執(zhí)行 if (Objects.equals(targetServer, transportService.defaultProtocol().getAddress())) { return point.proceed(); } log.info("[DesignateServerAspect] the method[{}] should execute in server[{}], so this request will be redirect to remote server!", signature.toShortString(), targetServer); // 轉(zhuǎn)發(fā)請求,遠(yuǎn)程執(zhí)行后返回結(jié)果 RemoteProcessReq remoteProcessReq = new RemoteProcessReq() .setClassName(className) .setMethodName(methodName) .setParameterTypes(parameterTypes) .setArgs(args); final URL friendUrl = ServerURLFactory.process2Friend(targetServer); CompletionStage<AskResponse> askCS = transportService.ask(Protocol.HTTP.name(), friendUrl, remoteProcessReq, AskResponse.class); AskResponse askResponse = askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (!askResponse.isSuccess()) { throw new PowerJobException("remote process failed: " + askResponse.getMessage()); } // 考慮范型情況 Method method = methodSignature.getMethod(); JavaType returnType = getMethodReturnJavaType(method); return OBJECT_MAPPER.readValue(askResponse.getData(), returnType); } //...... }
DesignateServerAspect攔截了@DesignateServer注解,它先解析方法參數(shù)名,取出參數(shù)名與@DesignateServer的appIdParameterName一致的參數(shù)值,再通過appInfoRepository.findById找到AppInfoDO,獲取appInfo.getCurrentServer();若currentServer就是本機(jī)則執(zhí)行point.proceed(),否則構(gòu)建RemoteProcessReq,通過transportService.ask轉(zhuǎn)發(fā)請求
示例
tech/powerjob/server/core/instance/InstanceLogService.java
/** * 獲取日志的下載鏈接 * @param appId AOP 專用 * @param instanceId 任務(wù)實(shí)例 ID * @return 下載鏈接 */ @DesignateServer public String fetchDownloadUrl(Long appId, Long instanceId) { String url = "http://" + NetUtils.getLocalHost() + ":" + port + "/instance/downloadLog?instanceId=" + instanceId; log.info("[InstanceLog-{}] downloadURL for appId[{}]: {}", instanceId, appId, url); return url; }
fetchDownloadUrl指定了@DesignateServer注解,會根據(jù)appId的值限定在指定server執(zhí)行
小結(jié)
PowerJob的DesignateServer注解定義了appIdParameterName屬性,默認(rèn)是appId;DesignateServerAspect攔截了@DesignateServer注解,它判斷currentServer就是本機(jī)則執(zhí)行point.proceed(),否則構(gòu)建RemoteProcessReq,通過transportService.ask轉(zhuǎn)發(fā)請求到指定server執(zhí)行。
以上就是PowerJob的DesignateServer工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob DesignateServer的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
feign post參數(shù)對象不加@RequestBody的使用說明
這篇文章主要介紹了feign post參數(shù)對象不加@RequestBody的使用說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10Java?MethodHandles介紹與反射對比區(qū)別詳解
這篇文章主要為大家介紹了Java?MethodHandles介紹與反射對比區(qū)別詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11Servlet和Filter之間的區(qū)別與聯(lián)系
這篇文章主要介紹了Servlet和Filter之間的區(qū)別與聯(lián)系的相關(guān)資料,需要的朋友可以參考下2016-05-05使用Swagger時Controller中api接口顯示不全的問題分析及解決
swagger是一個十分好用的api接口管理、測試框架,現(xiàn)在越來越多的人使用這個做接口的測試和管理,但經(jīng)常遇到Controller中的api接口顯示不全的問題,所以本文給大家詳細(xì)分析了問題以及解決方法,需要的朋友可以參考下2024-02-02windows 部署JAVA環(huán)境安裝iDea的詳細(xì)步驟
這篇文章主要介紹了windows 部署JAVA環(huán)境安裝iDea的詳細(xì)步驟,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08