PowerJob的AbstractScriptProcessor實(shí)現(xiàn)類工作流程源碼解讀
序
本文主要研究一下PowerJob的AbstractScriptProcessor
AbstractScriptProcessor
tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java
@Slf4j
public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
private static final ForkJoinPool POOL = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors());
private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp");
protected static final String SH_SHELL = "/bin/sh";
protected static final String CMD_SHELL = "cmd.exe";
private static final String WORKER_DIR = PowerFileUtils.workspace() + "/official_script_processor/";
@Override
protected ProcessResult process0(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
String scriptParams = CommonUtils.parseParams(context);
omsLogger.info("[SYSTEM] ScriptProcessor start to process, params: {}", scriptParams);
if (scriptParams == null) {
String message = "[SYSTEM] ScriptParams is null, please check jobParam configuration.";
omsLogger.warn(message);
return new ProcessResult(false, message);
}
String scriptPath = prepareScriptFile(context.getInstanceId(), scriptParams);
omsLogger.info("[SYSTEM] Generate executable file successfully, path: {}", scriptPath);
if (SystemUtils.IS_OS_WINDOWS) {
if (StringUtils.equals(getRunCommand(), SH_SHELL)) {
String message = String.format("[SYSTEM] Current OS is %s where shell scripts cannot run.", SystemUtils.OS_NAME);
omsLogger.warn(message);
return new ProcessResult(false, message);
}
}
// 授權(quán)
if ( !SystemUtils.IS_OS_WINDOWS) {
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
// 等待返回,這里不可能導(dǎo)致死鎖(shell產(chǎn)生大量數(shù)據(jù)可能導(dǎo)致死鎖)
chmodPb.start().waitFor();
omsLogger.info("[SYSTEM] chmod 755 authorization complete, ready to start execution~");
}
// 2. 執(zhí)行目標(biāo)腳本
ProcessBuilder pb = StringUtils.equals(getRunCommand(), CMD_SHELL) ?
new ProcessBuilder(getRunCommand(), "/c", scriptPath)
: new ProcessBuilder(getRunCommand(), scriptPath);
Process process = pb.start();
StringBuilder inputBuilder = new StringBuilder();
StringBuilder errorBuilder = new StringBuilder();
boolean success = true;
String result;
final Charset charset = getCharset();
try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) {
POOL.execute(() -> copyStream(is, inputBuilder, omsLogger, charset));
POOL.execute(() -> copyStream(es, errorBuilder, omsLogger, charset));
success = process.waitFor() == 0;
} catch (InterruptedException ie) {
omsLogger.info("[SYSTEM] ScriptProcessor has been interrupted");
} finally {
result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder.toString(), errorBuilder.toString());
}
return new ProcessResult(success, result);
}
/**
* 生成腳本名稱
* @param instanceId id of instance
* @return 文件名稱
*/
protected abstract String getScriptName(Long instanceId);
/**
* 獲取運(yùn)行命令(eg,shell返回 /bin/sh)
* @return 執(zhí)行腳本的命令
*/
protected abstract String getRunCommand();
//......
}AbstractScriptProcessor繼承了CommonBasicProcessor,它定義了一個(gè)parallelism為4*Runtime.getRuntime().availableProcessors()的ForkJoinPool;其process0方法先讀取scriptParams,然后執(zhí)行prepareScriptFile獲取scriptPath,接著使用chmod變更script權(quán)限為755,然后通過getRunCommand獲取命令,接著往pool提交copyStream,等待process返回
prepareScriptFile
private String prepareScriptFile(Long instanceId, String processorInfo) throws IOException {
String scriptPath = WORKER_DIR + getScriptName(instanceId);
File script = new File(scriptPath);
if (script.exists()) {
return scriptPath;
}
File dir = new File(script.getParent());
boolean success = dir.mkdirs();
success = script.createNewFile();
if (!success) {
throw new RuntimeException("create script file failed");
}
// 如果是下載鏈接,則從網(wǎng)絡(luò)獲取
for (String protocol : DOWNLOAD_PROTOCOL) {
if (processorInfo.startsWith(protocol)) {
FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000);
return scriptPath;
}
}
final Charset charset = getCharset();
if(charset != null)
{
try (Writer fstream = new OutputStreamWriter(Files.newOutputStream(script.toPath()), charset); BufferedWriter out = new BufferedWriter(fstream)) {
out.write(processorInfo);
out.flush();
}
}
else {
try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) {
bw.write(processorInfo);
bw.flush();
}
}
return scriptPath;
}prepareScriptFile先通過getScriptName獲取scriptPath,如果是http、https、ftp鏈接則通過FileUtils.copyURLToFile下載,否則把scriptParams寫入到scriptPath
copyStream
private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) {
String line;
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) {
while ((line = br.readLine()) != null) {
sb.append(line);
// 同步到在線日志
omsLogger.info(line);
}
} catch (Exception e) {
log.warn("[ScriptProcessor] copyStream failed.", e);
omsLogger.warn("[SYSTEM] copyStream failed.", e);
sb.append("Exception: ").append(e);
}
}copyStream會讀取InputStream到StringBuilder,同時(shí)打印到omsLogger
ShellProcessor
tech/powerjob/official/processors/impl/script/ShellProcessor.java
public class ShellProcessor extends AbstractScriptProcessor {
@Override
protected String getScriptName(Long instanceId) {
return String.format("shell_%d.sh", instanceId);
}
@Override
protected String getRunCommand() {
return SH_SHELL;
}
}ShellProcessor的getScriptName是基于shell_%d.sh和instanceId生成的;其getRunCommand為/bin/sh
CMDProcessor
tech/powerjob/official/processors/impl/script/CMDProcessor.java
public class CMDProcessor extends AbstractScriptProcessor {
@Override
protected String getScriptName(Long instanceId) {
return String.format("cmd_%d.bat", instanceId);
}
@Override
protected String getRunCommand() {
return "cmd.exe";
}
@Override
protected Charset getCharset() {
return Charset.defaultCharset();
}
}CMDProcessor的getScriptName是基于cmd_%d.bat和instanceId生成,其getRunCommand為`cmd.exe
PowerShellProcessor
tech/powerjob/official/processors/impl/script/PowerShellProcessor.java
public class PowerShellProcessor extends AbstractScriptProcessor {
@Override
protected String getScriptName(Long instanceId) {
return String.format("powershell_%d.ps1", instanceId);
}
@Override
protected String getRunCommand() {
return "powershell.exe";
}
@Override
protected Charset getCharset() {
return Charset.defaultCharset();
}
}PowerShellProcessor的getScriptName是基于powershell_%d.ps1"和instanceId生成,其getRunCommand為powershell.exe
PythonProcessor
tech/powerjob/official/processors/impl/script/PythonProcessor.java
public class PythonProcessor extends AbstractScriptProcessor {
@Override
protected String getScriptName(Long instanceId) {
return String.format("python_%d.py", instanceId);
}
@Override
protected String getRunCommand() {
return "python";
}
}PythonProcessor的getScriptName是基于python_%d.py和instanceId生成,其getRunCommand為python
小結(jié)
AbstractScriptProcessor繼承了CommonBasicProcessor,它有四個(gè)實(shí)現(xiàn)類分別是ShellProcessor、CMDProcessor、PowerShellProcessor、PythonProcessor;它定義了getScriptName、getRunCommand抽象方法;其process0方法主要是把scriptParams寫入到本地文件(scriptParams是http、https、ftp的則根據(jù)url進(jìn)行下載),然后修改權(quán)限為755,然后執(zhí)行pb.start(),再將input及errorStream收集到StringBuilder并打印到omsLogger,最后process.waitFor()等待處理完成。
以上就是PowerJob的AbstractScriptProcessor方法工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob AbstractScriptProcessor的資料請關(guān)注腳本之家其它相關(guān)文章!
- PowerJob的TimingStrategyHandler工作流程源碼解讀
- PowerJob的IdGenerateService工作流程源碼解讀
- PowerJob LockService方法工作流程源碼解讀
- PowerJob的Evaluator方法工作流程源碼解讀
- PowerJob的DatabaseMonitorAspect源碼流程
- PowerJob的WorkerHealthReporter工作流程源碼解讀
- PowerJob的OmsLogHandler工作流程源碼解析
- PowerJob的ServerDiscoveryService工作流程源碼解讀
- PowerJob的ProcessorLoader工作流程源碼解讀
- PowerJob的DispatchStrategy方法工作流程源碼解讀
相關(guān)文章
SpringBoot集成Caffeine緩存的實(shí)現(xiàn)步驟
Caffeine cache是一個(gè)針對Java的高性能緩存庫。在本文中,我們將介紹它與Spring Boot如何一起使用。2021-05-05
java開發(fā)中常遇到的各種難點(diǎn)以及解決思路方案
Java項(xiàng)目是一個(gè)復(fù)雜的軟件開發(fā)過程,其中會涉及到很多技術(shù)難點(diǎn),這篇文章主要給大家介紹了關(guān)于java開發(fā)中常遇到的各種難點(diǎn)以及解決思路方案的相關(guān)資料,需要的朋友可以參考下2023-07-07
基于Spring實(shí)現(xiàn)自定義錯誤信息返回詳解
這篇文章主要為大家詳細(xì)介紹了如何基于Spring實(shí)現(xiàn)自定義錯誤信息返回效果,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-03-03
Java--SSH,SSM和Spring?Boot框架區(qū)別及優(yōu)缺點(diǎn)說明
這篇文章主要介紹了Java--SSH,SSM和Spring?Boot框架區(qū)別及優(yōu)缺點(diǎn)說明,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-12-12
Spring Boot 中的 @PutMapping 注解原理及使用小結(jié)
在本文中,我們介紹了 Spring Boot 中的 @PutMapping 注解,它可以將 HTTP PUT 請求映射到指定的處理方法上,我們還介紹了 @PutMapping 注解的原理以及如何在 Spring Boot 中使用它,感興趣的朋友跟隨小編一起看看吧2023-12-12
springboot中PostMapping正常接收json參數(shù)后返回404問題
這篇文章主要介紹了springboot中PostMapping正常接收json參數(shù)后返回404問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-05-05
Java異常處理中同時(shí)有finally和return語句的執(zhí)行問題
這篇文章主要介紹了Java異常處理中同時(shí)有finally和return語句的執(zhí)行問題,首先確定的是一般finally語句都會被執(zhí)行...然后,需要的朋友可以參考下2015-11-11

