關(guān)于通過java調(diào)用datax,返回任務(wù)執(zhí)行的方法
DATAX
DataX 是阿里巴巴集團(tuán)內(nèi)被廣泛使用的離線數(shù)據(jù)同步工具/平臺(tái),實(shí)現(xiàn)包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構(gòu)數(shù)據(jù)源之間高效的數(shù)據(jù)同步功能。
datax的詳細(xì)介紹
請(qǐng)參考 DataX-Introduction
引言
因?yàn)闃I(yè)務(wù)需要,需要使用到datax把數(shù)據(jù)從文本寫入到數(shù)據(jù)庫(kù),原來的做法都是使用python通過datax.py去調(diào)用腳本,阿文為了能更好的管控datax的任務(wù),阿文要求我們對(duì)datax進(jìn)行改造,使用java集成的方式去調(diào)用datax,并返回任務(wù)執(zhí)行的詳細(xì)信息。
datax源碼跟蹤
從github下完源碼開始改造,datax的啟動(dòng)類在datax-core包下Engine類的entry方法,該方法是一個(gè)靜態(tài)方法。
public static void entry(final String[] args) throws Throwable { Options options = new Options(); options.addOption("job", true, "Job config."); options.addOption("jobid", true, "Job unique id."); options.addOption("mode", true, "Job runtime mode."); BasicParser parser = new BasicParser(); CommandLine cl = parser.parse(options, args); String jobPath = cl.getOptionValue("job"); // 如果用戶沒有明確指定jobid, 則 datax.py 會(huì)指定 jobid 默認(rèn)值為-1 String jobIdString = cl.getOptionValue("jobid"); RUNTIME_MODE = cl.getOptionValue("mode"); Configuration configuration = ConfigParser.parse(jobPath); long jobId; if (!"-1".equalsIgnoreCase(jobIdString)) { jobId = Long.parseLong(jobIdString); } else { // only for dsc & ds & datax 3 update String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml"; String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config"; String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/"; List<String> patternStringList = Arrays.asList(dscJobUrlPatternString, dsJobUrlPatternString, dsTaskGroupUrlPatternString); jobId = parseJobIdFromUrl(patternStringList, jobPath); } boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE); if (!isStandAloneMode && jobId == -1) { // 如果不是 standalone 模式,那么 jobId 一定不能為-1 throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必須在 URL 中提供有效的 jobId."); } configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId); //打印vmInfo VMInfo vmInfo = VMInfo.getVmInfo(); if (vmInfo != null) { LOG.info(vmInfo.toString()); } LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n"); LOG.debug(configuration.toJSON()); ConfigurationValidate.doValidate(configuration); Engine engine = new Engine(); engine.start(configuration); }
里面最后通過調(diào)用engine.start(configuration) 開始啟動(dòng),我們點(diǎn)進(jìn)去,最后會(huì)發(fā)現(xiàn)在里面是調(diào)用JobContainer 的start() 方法。
@Override public void start() { LOG.info("DataX jobContainer starts job."); boolean hasException = false; boolean isDryRun = false; try { this.startTimeStamp = System.currentTimeMillis(); isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false); if (isDryRun) { LOG.info("jobContainer starts to do preCheck ..."); this.preCheck(); } else { userConf = configuration.clone(); LOG.debug("jobContainer starts to do preHandle ..."); this.preHandle(); LOG.debug("jobContainer starts to do init ..."); this.init(); LOG.info("jobContainer starts to do prepare ..."); this.prepare(); LOG.info("jobContainer starts to do split ..."); this.totalStage = this.split(); LOG.info("jobContainer starts to do schedule ..."); this.schedule(); LOG.debug("jobContainer starts to do post ..."); this.post(); LOG.debug("jobContainer starts to do postHandle ..."); this.postHandle(); LOG.info("DataX jobId [{}] completed successfully.", this.jobId); this.invokeHooks(); } } catch (Throwable e) { LOG.error("Exception when job run", e); hasException = true; if (e instanceof OutOfMemoryError) { this.destroy(); System.gc(); } if (super.getContainerCommunicator() == null) { // 由于 containerCollector 是在 scheduler() 中初始化的,所以當(dāng)在 scheduler() 之前出現(xiàn)異常時(shí),需要在此處對(duì) containerCollector 進(jìn)行初始化 AbstractContainerCommunicator tempContainerCollector; // standalone tempContainerCollector = new StandAloneJobContainerCommunicator(configuration); super.setContainerCommunicator(tempContainerCollector); } Communication communication = super.getContainerCommunicator().collect(); // 匯報(bào)前的狀態(tài),不需要手動(dòng)進(jìn)行設(shè)置 // communication.setState(State.FAILED); communication.setThrowable(e); communication.setTimestamp(this.endTimeStamp); Communication tempComm = new Communication(); tempComm.setTimestamp(this.startTransferTimeStamp); Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage); super.getContainerCommunicator().report(reportCommunication); throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); } finally { if (!isDryRun) { this.destroy(); this.endTimeStamp = System.currentTimeMillis(); if (!hasException) { //最后打印cpu的平均消耗,GC的統(tǒng)計(jì) VMInfo vmInfo = VMInfo.getVmInfo(); if (vmInfo != null) { vmInfo.getDelta(false); LOG.info(vmInfo.totalString()); } LOG.info(PerfTrace.getInstance().summarizeNoException()); this.logStatistics(); } } } }
而我們需要的任務(wù)信息就在this.logStatistics() 中
private void logStatistics() { long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000; long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000; if (0L == transferCosts) { transferCosts = 1L; } if (super.getContainerCommunicator() == null) { return; } Communication communication = super.getContainerCommunicator().collect(); communication.setTimestamp(this.endTimeStamp); Communication tempComm = new Communication(); tempComm.setTimestamp(this.startTransferTimeStamp); Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage); // 字節(jié)速率 long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES) / transferCosts; long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS) / transferCosts; reportCommunication.setLongCounter(CommunicationTool.BYTE_SPEED, byteSpeedPerSecond); reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, recordSpeedPerSecond); super.getContainerCommunicator().report(reportCommunication); LOG.info(String.format( "\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n", "任務(wù)啟動(dòng)時(shí)刻", dateFormat.format(startTimeStamp), "任務(wù)結(jié)束時(shí)刻", dateFormat.format(endTimeStamp), "任務(wù)總計(jì)耗時(shí)", String.valueOf(totalCosts) + "s", "任務(wù)平均流量", StrUtil.stringify(byteSpeedPerSecond) + "/s", "記錄寫入速度", String.valueOf(recordSpeedPerSecond) + "rec/s", "讀出記錄總數(shù)", String.valueOf(CommunicationTool.getTotalReadRecords(communication)), "讀寫失敗總數(shù)", String.valueOf(CommunicationTool.getTotalErrorRecords(communication)) )); LOG.info("task-total-info:" + dateFormat.format(startTimeStamp) + "|" + dateFormat.format(endTimeStamp) + "|" + String.valueOf(totalCosts) + "|" + StrUtil.stringify(byteSpeedPerSecond) + "|" + String.valueOf(recordSpeedPerSecond) + "|" + String.valueOf(CommunicationTool.getTotalReadRecords(communication)) + "|" + String.valueOf(CommunicationTool.getTotalErrorRecords(communication)) ); if (communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0 || communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS) > 0 || communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) { LOG.info(String.format( "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n", "Transformer成功記錄總數(shù)", communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS), "Transformer失敗記錄總數(shù)", communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS), "Transformer過濾記錄總數(shù)", communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) )); } }
改造開始
新增返回實(shí)體DataxResult (get、set省略)
public class DataxResult { //任務(wù)啟動(dòng)時(shí)刻 private long startTimeStamp; //任務(wù)結(jié)束時(shí)刻 private long endTimeStamp; //任務(wù)總時(shí)耗 private long totalCosts; //任務(wù)平均流量 private long byteSpeedPerSecond; //記錄寫入速度 private long recordSpeedPerSecond; //讀出記錄總數(shù) private long totalReadRecords; //讀寫失敗總數(shù) private long totalErrorRecords; //成功記錄總數(shù) private long transformerSucceedRecords; // 失敗記錄總數(shù) private long transformerFailedRecords; // 過濾記錄總數(shù) private long transformerFilterRecords; //字節(jié)數(shù) private long readSucceedBytes; //轉(zhuǎn)換開始時(shí)間 private long endTransferTimeStamp; //轉(zhuǎn)換結(jié)束時(shí)間 private long startTransferTimeStamp; //轉(zhuǎn)換總耗時(shí) private long transferCosts;
重寫logStatistics方法,返回該實(shí)體。
private DataxResult logStatistics(DataxResult resultMsg) { long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000; long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000; if (0L == transferCosts) { transferCosts = 1L; } if (super.getContainerCommunicator() == null) { return resultMsg; } Communication communication = super.getContainerCommunicator().collect(); long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES) / transferCosts; long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS) / transferCosts; return resultMsg.getResultMsg(startTimeStamp, endTimeStamp, totalCosts, byteSpeedPerSecond, recordSpeedPerSecond, communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS), communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS), communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS), communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS), communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS), communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES), this.endTransferTimeStamp, this.startTransferTimeStamp, transferCosts ); }
還需要重寫JobContainer的**start()**方法。
@Override public DataxResult start(DataxResult dataxResult) { ... DataxResult result = new DataxResult(); result = logStatistics(dataxResult); ... return result; }
然后在Engine 類中添加模擬測(cè)試方法mockentry
public DataxResult mockstart(Configuration allConf) { ... DataxResult dataxResult = new DataxResult(); return container.start(dataxResult); }
開始測(cè)試
在com.alibaba.datax.core.util.container.CoreConstant里修改datax_home 為本地路徑
該datax_home路徑下有以下幾個(gè)目錄
public class test { public static void main(String[] args) { String[] datxArgs = {"-job", CoreConstant.DATAX_HOME + "\\job\\job2.json", "-mode", "standalone", "-jobid", "-1"}; try { DataxResult dataxResult= Engine.mockentry(datxArgs); } catch (Throwable e) { e.printStackTrace(); } } }
執(zhí)行結(jié)果為
3
大功告成!
以上這篇關(guān)于通過java調(diào)用datax,返回任務(wù)執(zhí)行的方法就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
IDEA配置java開發(fā)環(huán)境(maven、gradle、tomcat)
這篇文章主要介紹了IDEA配置java開發(fā)環(huán)境(maven、gradle、tomcat),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09SpringBoot定時(shí)任務(wù)詳解與案例代碼
SpringBoot是一個(gè)流行的Java開發(fā)框架,它提供了許多便捷的特性來簡(jiǎn)化開發(fā)過程,其中之一就是定時(shí)任務(wù)的支持,讓開發(fā)人員可以輕松地在應(yīng)用程序中執(zhí)行定時(shí)任務(wù),本文將詳細(xì)介紹如何在Spring?Boot中使用定時(shí)任務(wù),并提供相關(guān)的代碼示例2023-06-06舉例講解Java的Jackson庫(kù)中ObjectMapper類的使用
這篇文章主要介紹了舉例講解Java的Jackson庫(kù)中ObjectMapper類的使用,Jackson庫(kù)通常被用來實(shí)現(xiàn)Java的對(duì)象和JSON之間的轉(zhuǎn)換功能,需要的朋友可以參考下2016-01-01Java基礎(chǔ)之選擇結(jié)構(gòu)與循環(huán)結(jié)構(gòu)
這篇文章主要介紹了Java基礎(chǔ)之選擇結(jié)構(gòu)與循環(huán)結(jié)構(gòu),文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java基礎(chǔ)的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04mybatis通過if語(yǔ)句實(shí)現(xiàn)增刪改查操作
這篇文章主要介紹了mybatis通過if語(yǔ)句實(shí)現(xiàn)增刪改查操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-11-11Spring Cache自定義緩存key和過期時(shí)間的實(shí)現(xiàn)代碼
使用 Redis的客戶端 Spring Cache時(shí),會(huì)發(fā)現(xiàn)生成 key中會(huì)多出一個(gè)冒號(hào),而且有一個(gè)空節(jié)點(diǎn)的存在,查看源碼可知,這是因?yàn)?nbsp;Spring Cache默認(rèn)生成key的策略就是通過兩個(gè)冒號(hào)來拼接,本文給大家介紹了Spring Cache自定義緩存key和過期時(shí)間的實(shí)現(xiàn),需要的朋友可以參考下2024-05-05