欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

關(guān)于通過java調(diào)用datax,返回任務(wù)執(zhí)行的方法

 更新時(shí)間:2019年08月28日 10:48:49   作者:沉夢(mèng)楊志  
今天小編就為大家分享一篇關(guān)于通過java調(diào)用datax,返回任務(wù)執(zhí)行的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧

DATAX

DataX 是阿里巴巴集團(tuán)內(nèi)被廣泛使用的離線數(shù)據(jù)同步工具/平臺(tái),實(shí)現(xiàn)包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構(gòu)數(shù)據(jù)源之間高效的數(shù)據(jù)同步功能。

datax的詳細(xì)介紹

請(qǐng)參考 DataX-Introduction

引言

因?yàn)闃I(yè)務(wù)需要,需要使用到datax把數(shù)據(jù)從文本寫入到數(shù)據(jù)庫(kù),原來的做法都是使用python通過datax.py去調(diào)用腳本,阿文為了能更好的管控datax的任務(wù),阿文要求我們對(duì)datax進(jìn)行改造,使用java集成的方式去調(diào)用datax,并返回任務(wù)執(zhí)行的詳細(xì)信息。

datax源碼跟蹤

從github下完源碼開始改造,datax的啟動(dòng)類在datax-core包下Engine類的entry方法,該方法是一個(gè)靜態(tài)方法。

public static void entry(final String[] args) throws Throwable {
 Options options = new Options();
 options.addOption("job", true, "Job config.");
 options.addOption("jobid", true, "Job unique id.");
 options.addOption("mode", true, "Job runtime mode.");

 BasicParser parser = new BasicParser();
 CommandLine cl = parser.parse(options, args);

 String jobPath = cl.getOptionValue("job");

 // 如果用戶沒有明確指定jobid, 則 datax.py 會(huì)指定 jobid 默認(rèn)值為-1
 String jobIdString = cl.getOptionValue("jobid");
 RUNTIME_MODE = cl.getOptionValue("mode");

 Configuration configuration = ConfigParser.parse(jobPath);

 long jobId;
 if (!"-1".equalsIgnoreCase(jobIdString)) {
  jobId = Long.parseLong(jobIdString);
 } else {
  // only for dsc & ds & datax 3 update
  String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml";
  String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";
  String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";
  List<String> patternStringList = Arrays.asList(dscJobUrlPatternString,
   dsJobUrlPatternString, dsTaskGroupUrlPatternString);
  jobId = parseJobIdFromUrl(patternStringList, jobPath);
 }

 boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
 if (!isStandAloneMode && jobId == -1) {
  // 如果不是 standalone 模式,那么 jobId 一定不能為-1
  throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必須在 URL 中提供有效的 jobId.");
 }
 configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);

 //打印vmInfo
 VMInfo vmInfo = VMInfo.getVmInfo();
 if (vmInfo != null) {
  LOG.info(vmInfo.toString());
 }

 LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");

 LOG.debug(configuration.toJSON());

 ConfigurationValidate.doValidate(configuration);
 Engine engine = new Engine();
 engine.start(configuration);
 }

里面最后通過調(diào)用engine.start(configuration) 開始啟動(dòng),我們點(diǎn)進(jìn)去,最后會(huì)發(fā)現(xiàn)在里面是調(diào)用JobContainer 的start() 方法。

@Override
 public void start() {
 LOG.info("DataX jobContainer starts job.");

 boolean hasException = false;
 boolean isDryRun = false;
 try {
  this.startTimeStamp = System.currentTimeMillis();
  isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
  if (isDryRun) {
  LOG.info("jobContainer starts to do preCheck ...");
  this.preCheck();
  } else {
  userConf = configuration.clone();
  LOG.debug("jobContainer starts to do preHandle ...");
  this.preHandle();

  LOG.debug("jobContainer starts to do init ...");
  this.init();
  LOG.info("jobContainer starts to do prepare ...");
  this.prepare();
  LOG.info("jobContainer starts to do split ...");
  this.totalStage = this.split();
  LOG.info("jobContainer starts to do schedule ...");
  this.schedule();
  LOG.debug("jobContainer starts to do post ...");
  this.post();

  LOG.debug("jobContainer starts to do postHandle ...");
  this.postHandle();
  LOG.info("DataX jobId [{}] completed successfully.", this.jobId);

  this.invokeHooks();
  }
 } catch (Throwable e) {
  LOG.error("Exception when job run", e);

  hasException = true;

  if (e instanceof OutOfMemoryError) {
  this.destroy();
  System.gc();
  }


  if (super.getContainerCommunicator() == null) {
  // 由于 containerCollector 是在 scheduler() 中初始化的,所以當(dāng)在 scheduler() 之前出現(xiàn)異常時(shí),需要在此處對(duì) containerCollector 進(jìn)行初始化

  AbstractContainerCommunicator tempContainerCollector;
  // standalone
  tempContainerCollector = new StandAloneJobContainerCommunicator(configuration);

  super.setContainerCommunicator(tempContainerCollector);
  }

  Communication communication = super.getContainerCommunicator().collect();
  // 匯報(bào)前的狀態(tài),不需要手動(dòng)進(jìn)行設(shè)置
  // communication.setState(State.FAILED);
  communication.setThrowable(e);
  communication.setTimestamp(this.endTimeStamp);

  Communication tempComm = new Communication();
  tempComm.setTimestamp(this.startTransferTimeStamp);

  Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
  super.getContainerCommunicator().report(reportCommunication);

  throw DataXException.asDataXException(
   FrameworkErrorCode.RUNTIME_ERROR, e);
 } finally {
  if (!isDryRun) {

  this.destroy();
  this.endTimeStamp = System.currentTimeMillis();
  if (!hasException) {
   //最后打印cpu的平均消耗,GC的統(tǒng)計(jì)
   VMInfo vmInfo = VMInfo.getVmInfo();
   if (vmInfo != null) {
   vmInfo.getDelta(false);
   LOG.info(vmInfo.totalString());
   }

   LOG.info(PerfTrace.getInstance().summarizeNoException());
   this.logStatistics();
  }
  }
 }
 }

而我們需要的任務(wù)信息就在this.logStatistics() 中

private void logStatistics() {
 long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000;
 long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;
 if (0L == transferCosts) {
  transferCosts = 1L;
 }

 if (super.getContainerCommunicator() == null) {
  return;
 }

 Communication communication = super.getContainerCommunicator().collect();
 communication.setTimestamp(this.endTimeStamp);

 Communication tempComm = new Communication();
 tempComm.setTimestamp(this.startTransferTimeStamp);

 Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);

 // 字節(jié)速率
 long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)
  / transferCosts;

 long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)
  / transferCosts;

 reportCommunication.setLongCounter(CommunicationTool.BYTE_SPEED, byteSpeedPerSecond);
 reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, recordSpeedPerSecond);

 super.getContainerCommunicator().report(reportCommunication);


 LOG.info(String.format(
  "\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"
   + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
   + "%-26s: %19s\n",
  "任務(wù)啟動(dòng)時(shí)刻",
  dateFormat.format(startTimeStamp),

  "任務(wù)結(jié)束時(shí)刻",
  dateFormat.format(endTimeStamp),

  "任務(wù)總計(jì)耗時(shí)",
  String.valueOf(totalCosts) + "s",
  "任務(wù)平均流量",
  StrUtil.stringify(byteSpeedPerSecond)
   + "/s",
  "記錄寫入速度",
  String.valueOf(recordSpeedPerSecond)
   + "rec/s", "讀出記錄總數(shù)",
  String.valueOf(CommunicationTool.getTotalReadRecords(communication)),
  "讀寫失敗總數(shù)",
  String.valueOf(CommunicationTool.getTotalErrorRecords(communication))
 ));

 LOG.info("task-total-info:" + dateFormat.format(startTimeStamp) + "|" +
  dateFormat.format(endTimeStamp) + "|" +
  String.valueOf(totalCosts) + "|" +
  StrUtil.stringify(byteSpeedPerSecond) + "|" +
  String.valueOf(recordSpeedPerSecond) + "|" +
  String.valueOf(CommunicationTool.getTotalReadRecords(communication)) + "|" +
  String.valueOf(CommunicationTool.getTotalErrorRecords(communication))
 );

 if (communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0
  || communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS) > 0
  || communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) {
  LOG.info(String.format(
   "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n",
   "Transformer成功記錄總數(shù)",
   communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),

   "Transformer失敗記錄總數(shù)",
   communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),

   "Transformer過濾記錄總數(shù)",
   communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS)
  ));
 }
 }

改造開始

新增返回實(shí)體DataxResult (get、set省略)

public class DataxResult {
 //任務(wù)啟動(dòng)時(shí)刻
 private long startTimeStamp;
 //任務(wù)結(jié)束時(shí)刻
 private long endTimeStamp;
 //任務(wù)總時(shí)耗
 private long totalCosts;
 //任務(wù)平均流量
 private long byteSpeedPerSecond;
 //記錄寫入速度
 private long recordSpeedPerSecond;
 //讀出記錄總數(shù)
 private long totalReadRecords;
 //讀寫失敗總數(shù)
 private long totalErrorRecords;
 //成功記錄總數(shù)
 private long transformerSucceedRecords;
 // 失敗記錄總數(shù)
 private long transformerFailedRecords;
 // 過濾記錄總數(shù)
 private long transformerFilterRecords;
 //字節(jié)數(shù)
 private long readSucceedBytes;
 //轉(zhuǎn)換開始時(shí)間
 private long endTransferTimeStamp;
 //轉(zhuǎn)換結(jié)束時(shí)間
 private long startTransferTimeStamp;
 //轉(zhuǎn)換總耗時(shí)
 private long transferCosts;

重寫logStatistics方法,返回該實(shí)體。

private DataxResult logStatistics(DataxResult resultMsg) {
 long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000;
 long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;
 if (0L == transferCosts) {
  transferCosts = 1L;
 }
 if (super.getContainerCommunicator() == null) {
  return resultMsg;
 }
 Communication communication = super.getContainerCommunicator().collect();
 long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)
  / transferCosts;
 long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)
  / transferCosts;
  
 return resultMsg.getResultMsg(startTimeStamp,
  endTimeStamp,
  totalCosts,
  byteSpeedPerSecond,
  recordSpeedPerSecond,
  communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),
  communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),
  communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS),
  communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),
  communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS),
  communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES),
  this.endTransferTimeStamp,
  this.startTransferTimeStamp,
  transferCosts
 );


 }

還需要重寫JobContainer的**start()**方法。

@Override
 public DataxResult start(DataxResult dataxResult) {
 ...
 DataxResult result = new DataxResult();
 result = logStatistics(dataxResult);
 ...
 return result;
 }

然后在Engine 類中添加模擬測(cè)試方法mockentry

 public DataxResult mockstart(Configuration allConf) {

 ...
 DataxResult dataxResult = new DataxResult();
 return container.start(dataxResult);
 }

開始測(cè)試

在com.alibaba.datax.core.util.container.CoreConstant里修改datax_home 為本地路徑

該datax_home路徑下有以下幾個(gè)目錄

public class test {

 public static void main(String[] args) {
 String[] datxArgs = {"-job", CoreConstant.DATAX_HOME + "\\job\\job2.json", "-mode", "standalone", "-jobid", "-1"};
 try {
  DataxResult dataxResult= Engine.mockentry(datxArgs);
 } catch (Throwable e) {
  e.printStackTrace();
 }

 }
}

執(zhí)行結(jié)果為

3

大功告成!

以上這篇關(guān)于通過java調(diào)用datax,返回任務(wù)執(zhí)行的方法就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Mybatis初始化知識(shí)小結(jié)

    Mybatis初始化知識(shí)小結(jié)

    Mybatis的初始化過程就是加載自己運(yùn)行時(shí)所需要的配置信息的過程,這篇文章主要介紹了Mybatis初始化知識(shí)小結(jié),需要的朋友可以參考下
    2021-10-10
  • java 格式化輸出數(shù)字的方法

    java 格式化輸出數(shù)字的方法

    在實(shí)際工作中,常常需要設(shè)定數(shù)字的輸出格式,如以百分比的形式輸出,或者設(shè)定小數(shù)位數(shù)等,現(xiàn)稍微總結(jié)如下
    2014-01-01
  • IDEA配置java開發(fā)環(huán)境(maven、gradle、tomcat)

    IDEA配置java開發(fā)環(huán)境(maven、gradle、tomcat)

    這篇文章主要介紹了IDEA配置java開發(fā)環(huán)境(maven、gradle、tomcat),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-09-09
  • SpringBoot定時(shí)任務(wù)詳解與案例代碼

    SpringBoot定時(shí)任務(wù)詳解與案例代碼

    SpringBoot是一個(gè)流行的Java開發(fā)框架,它提供了許多便捷的特性來簡(jiǎn)化開發(fā)過程,其中之一就是定時(shí)任務(wù)的支持,讓開發(fā)人員可以輕松地在應(yīng)用程序中執(zhí)行定時(shí)任務(wù),本文將詳細(xì)介紹如何在Spring?Boot中使用定時(shí)任務(wù),并提供相關(guān)的代碼示例
    2023-06-06
  • 舉例講解Java的Jackson庫(kù)中ObjectMapper類的使用

    舉例講解Java的Jackson庫(kù)中ObjectMapper類的使用

    這篇文章主要介紹了舉例講解Java的Jackson庫(kù)中ObjectMapper類的使用,Jackson庫(kù)通常被用來實(shí)現(xiàn)Java的對(duì)象和JSON之間的轉(zhuǎn)換功能,需要的朋友可以參考下
    2016-01-01
  • Java通俗易懂系列設(shè)計(jì)模式之建造者模式

    Java通俗易懂系列設(shè)計(jì)模式之建造者模式

    這篇文章主要介紹了Java通俗易懂系列設(shè)計(jì)模式之建造者模式,對(duì)設(shè)計(jì)模式感興趣的讀者,一定要看一下
    2021-04-04
  • Java基礎(chǔ)之選擇結(jié)構(gòu)與循環(huán)結(jié)構(gòu)

    Java基礎(chǔ)之選擇結(jié)構(gòu)與循環(huán)結(jié)構(gòu)

    這篇文章主要介紹了Java基礎(chǔ)之選擇結(jié)構(gòu)與循環(huán)結(jié)構(gòu),文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java基礎(chǔ)的小伙伴們有非常好的幫助,需要的朋友可以參考下
    2021-04-04
  • mybatis通過if語(yǔ)句實(shí)現(xiàn)增刪改查操作

    mybatis通過if語(yǔ)句實(shí)現(xiàn)增刪改查操作

    這篇文章主要介紹了mybatis通過if語(yǔ)句實(shí)現(xiàn)增刪改查操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-11-11
  • Java?詳細(xì)講解用堆解決Top-k問題

    Java?詳細(xì)講解用堆解決Top-k問題

    TopK問題即在N個(gè)數(shù)中找出最大的前K個(gè),這篇文章將詳細(xì)講解如何利用小根堆的方法解決TopK問題,文中代碼具有一定參考價(jià)值,快跟隨小編一起學(xué)習(xí)一下吧
    2022-04-04
  • Spring Cache自定義緩存key和過期時(shí)間的實(shí)現(xiàn)代碼

    Spring Cache自定義緩存key和過期時(shí)間的實(shí)現(xiàn)代碼

    使用 Redis的客戶端 Spring Cache時(shí),會(huì)發(fā)現(xiàn)生成 key中會(huì)多出一個(gè)冒號(hào),而且有一個(gè)空節(jié)點(diǎn)的存在,查看源碼可知,這是因?yàn)?nbsp;Spring Cache默認(rèn)生成key的策略就是通過兩個(gè)冒號(hào)來拼接,本文給大家介紹了Spring Cache自定義緩存key和過期時(shí)間的實(shí)現(xiàn),需要的朋友可以參考下
    2024-05-05

最新評(píng)論