Java定時(shí)調(diào)用.ktr文件的示例代碼(解決方案)
1.Maven依賴
<!-- Kettle -->
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-core</artifactId>
<version>7.1.0.0-12</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-engine</artifactId>
<version>7.1.0.0-12</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>metastore</artifactId>
<version>7.1.0.0-12</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>1.4</version>
</dependency>
<!-- connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
注意:kettle的jar包依賴會(huì)拉不下來(lái),需要將jar包install到本地,命令:
創(chuàng)建 0_install.bat 文件
:: 本地 install kettle-core.jar包 CALL mvn install:install-file -Dfile=kettle-core-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-core -Dversion=7.1.0.0-12 -Dpackaging=jar :: 本地 install kettle-engine.jar包 CALL mvn install:install-file -Dfile=kettle-engine-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-engine -Dversion=7.1.0.0-12 -Dpackaging=jar :: 本地 install metastore.jar包 CALL mvn install:install-file -Dfile=metastore-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=metastore -Dversion=7.1.0.0-12 -Dpackaging=jar pause
或者deploy到內(nèi)網(wǎng)私服上,命令:
創(chuàng)建 1_deploy.bat 文件
:: 私服 deploy kettle-core.jar包 CALL mvn deploy:deploy-file -Dfile=kettle-core-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-core -Dversion=7.1.0.0-12 -Dpackaging=jar -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服務(wù)ID :: 私服 deploy kettle-engine.jar包 CALL mvn deploy:deploy-file -Dfile=kettle-engine-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-engine -Dversion=7.1.0.0-12 -Dpackaging=jar -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服務(wù)ID :: 私服 deploy metastore.jar包 CALL mvn deploy:deploy-file -Dfile=metastore-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=metastore -Dversion=7.1.0.0-12 -Dpackaging=jar -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服務(wù)ID pause
(腳本創(chuàng)建在jar包目錄下,創(chuàng)建好之后雙擊運(yùn)行即可)
【jar包、腳本文件下載地址】
https://share.weiyun.com/eaOSjqP7
2.執(zhí)行.ktr/.kjb工具類
KettleReadUtils.java
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import java.io.InputStream;
/**
* <p> @Title KettleReadUtils
* <p> @Description Kettle工具包
*
* @author zhj
* @date 2021/4/8 10:50
*/
public class KettleReadUtils {
/**
* 調(diào)用 kettle ktr
*
* @param path 文件路徑
*/
public static void runKtr(String path) {
try {
KettleEnvironment.init();
EnvUtil.environmentInit();
TransMeta transMeta = new TransMeta(path);
Trans trans = new Trans(transMeta);
trans.execute(null);
trans.waitUntilFinished();
if (trans.getErrors() > 0) {
throw new Exception("Errors during transformation execution!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 以流的方式調(diào)用 kettle ktr
*
* @param in 文件流
*/
public static void runKtrByStream(InputStream in) {
try {
KettleEnvironment.init();
TransMeta transMeta = new TransMeta(in, null, true, null, null);
Trans trans = new Trans(transMeta);
trans.execute(null);
trans.waitUntilFinished();
if (trans.getErrors() > 0) {
throw new Exception("Errors during transformation execution!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 調(diào)用 kettle job
*
* @param paraNames 多個(gè)參數(shù)名
* @param paraValues 多個(gè)參數(shù)值
* @param jobPath 如: String fName= "D:\\kettle\\aaa.kjb";
*/
public static void runJob(String[] paraNames, String[] paraValues, String jobPath) {
try {
KettleEnvironment.init();
JobMeta jobMeta = new JobMeta(jobPath, null);
Job job = new Job(null, jobMeta);
// 向Job 腳本傳遞參數(shù),腳本中獲取參數(shù)值:${參數(shù)名}
if (paraNames != null && paraValues != null) {
for (int i = 0; i < paraNames.length && i < paraValues.length; i++) {
job.setVariable(paraNames[i], paraValues[i]);
}
}
job.start();
job.waitUntilFinished();
if (job.getErrors() > 0) {
throw new Exception("Errors during job execution!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.創(chuàng)建.ktr/.kjb工具類
(此處只是提供java創(chuàng)建途徑,可以直接使用Spoon.bat創(chuàng)建好的文件)
import org.apache.commons.io.FileUtils;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import java.io.File;
/**
* <p> @Title KettleReadUtils
* <p> @Description Kettle工具包
*
* @author zhj
* @date 2021/4/8 10:50
*/
public class KettleWriteUtils {
/**
* 數(shù)據(jù)庫(kù)連接信息,適用于DatabaseMeta其中 一個(gè)構(gòu)造器DatabaseMeta(String xml)
*/
private static final String DATABASE_XML_1 =
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<connection>" +
"<name>db1</name>" +
"<server>127.0.0.1</server>" +
"<type>MYSQL</type>" +
"<access>Native</access>" +
"<database>test</database>" +
"<port>3306</port>" +
"<username>root</username>" +
"<password>root</password>" +
"</connection>";
private static final String DATABASE_XML_2 =
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<connection>" +
"<name>db2</name>" +
"<server>127.0.0.1</server>" +
"<type>MYSQL</type>" +
"<access>Native</access>" +
"<database>test</database>" +
"<port>3306</port>" +
"<username>root</username>" +
"<password>root</password>" +
"</connection>";
/**
* 創(chuàng)建ktr文件
*
* @param args args
*/
public static void main(String[] args) {
try {
KettleEnvironment.init();
KettleWriteUtils kettleWriteUtils = new KettleWriteUtils();
TransMeta transMeta = kettleWriteUtils.generateMyOwnTrans();
String transXml = transMeta.getXML();
String transName = "update_insert_Trans.ktr";
File file = new File(transName);
FileUtils.writeStringToFile(file, transXml, "UTF-8");
} catch (Exception e) {
e.printStackTrace();
return;
}
}
/**
* 生成一個(gè)轉(zhuǎn)化,把一個(gè)數(shù)據(jù)庫(kù)中的數(shù)據(jù)轉(zhuǎn)移到另一個(gè)數(shù)據(jù)庫(kù)中,只有兩個(gè)步驟,第一個(gè)是表輸入,第二個(gè)是表插入與更新操作
* @return 元數(shù)據(jù)
* @throws KettleXMLException 生成XML異常
*/
private TransMeta generateMyOwnTrans() throws KettleXMLException {
TransMeta transMeta = new TransMeta();
//設(shè)置轉(zhuǎn)化的名稱
transMeta.setName("insert_update");
//添加轉(zhuǎn)換的數(shù)據(jù)庫(kù)連接
DatabaseMeta databaseMeta1 = new DatabaseMeta(DATABASE_XML_1);
transMeta.addDatabase(databaseMeta1);
DatabaseMeta databaseMeta2 = new DatabaseMeta(DATABASE_XML_2);
transMeta.addDatabase(databaseMeta2);
//registry是給每個(gè)步驟生成一個(gè)標(biāo)識(shí)Id用
PluginRegistry registry = PluginRegistry.getInstance();
//第一個(gè)表輸入步驟(TableInputMeta)
TableInputMeta tableInput = new TableInputMeta();
String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput);
//給表輸入添加一個(gè)DatabaseMeta連接數(shù)據(jù)庫(kù)
DatabaseMeta db1 = transMeta.findDatabase("db1");
tableInput.setDatabaseMeta(db1);
String sql = "SELECT USER_ID,USER_NAME FROM t_manager_user";
tableInput.setSQL(sql);
//添加TableInputMeta到轉(zhuǎn)換中
StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId, "table input", tableInput);
//給步驟添加在spoon工具中的顯示位置
tableInputMetaStep.setDraw(true);
tableInputMetaStep.setLocation(100, 100);
transMeta.addStep(tableInputMetaStep);
//第二個(gè)步驟插入與更新
InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
String insertUpdateMetaPluginId = registry.getPluginId(StepPluginType.class, insertUpdateMeta);
//添加數(shù)據(jù)庫(kù)連接
DatabaseMeta db2 = transMeta.findDatabase("db2");
insertUpdateMeta.setDatabaseMeta(db2);
//設(shè)置操作的表
insertUpdateMeta.setTableName("t_stat_user_info");
//設(shè)置用來(lái)查詢的關(guān)鍵字
insertUpdateMeta.setKeyLookup(new String[]{"USER_ID"});
insertUpdateMeta.setKeyStream(new String[]{"USER_ID"});
insertUpdateMeta.setKeyStream2(new String[]{""});
insertUpdateMeta.setKeyCondition(new String[]{"="});
//設(shè)置要更新的字段
String[] updatelookup = {"USER_ID","USER_NAME"} ;
String[] updateStream = {"USER_ID","USER_NAME"} ;
Boolean[] updateOrNot = {false,true};
insertUpdateMeta.setUpdateLookup(updatelookup);
insertUpdateMeta.setUpdateStream(updateStream);
insertUpdateMeta.setUpdate(updateOrNot);
//添加步驟到轉(zhuǎn)換中
StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId, "insert_update", insertUpdateMeta);
insertUpdateStep.setDraw(true);
insertUpdateStep.setLocation(250, 100);
transMeta.addStep(insertUpdateStep);
//添加hop把兩個(gè)步驟關(guān)聯(lián)起來(lái)
transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep));
return transMeta;
}
}
4.測(cè)試執(zhí)行.ktr文件
執(zhí)行用例:
public static void main(String[] args) {
InputStream inputStream = KettleReadUtils.class.getResourceAsStream("/etl/test.ktr");
runKtrByStream(inputStream);
}
.ktr文件位置:

執(zhí)行結(jié)果:
5.Kettle所使用的mysql-connector 5.1.49 和 8 版本不兼容問(wèn)題
- mysql-connector-java 5.1.49 版本中,支持連接驅(qū)動(dòng),
org.gjt.mm.mysql.Driver; - mysql-connector-java 8.* 版本中,連接驅(qū)動(dòng),
com.mysql.cj.jdbc.Driver; - 如果直接使用 8.* 版本 去連接 MySQL 數(shù)據(jù)庫(kù)的話會(huì)出現(xiàn)"錯(cuò)誤連接數(shù)據(jù)庫(kù)"問(wèn)題:
Driver class ‘org.gjt.mm.mysql.Driver' could not be found, make sure the ‘MySQL' driver (jar file) is installed.
org.gjt.mm.mysql.Driver

解決方案:
1.關(guān)閉Kettle;
2.將/data-integration/lib/ 下面的 mysql-connector-java-5.1.49.jar 替換為 mysql-connector-java-8.*.jar;
3.打開(kāi)Kettle,修改連接類型為 Generic database ,配置驅(qū)動(dòng)名稱為 com.mysql.cj.jdbc.Driver;
4.重新導(dǎo)出為.ktr/.kjb文件;
5.再用java調(diào)用即可解決問(wèn)題。

整理完畢,完結(jié)撒花~
到此這篇關(guān)于Java定時(shí)調(diào)用.ktr文件的示例代碼的文章就介紹到這了,更多相關(guān)Java定時(shí)調(diào)用.ktr文件內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java實(shí)戰(zhàn)之簡(jiǎn)單的文件管理器
- java不解壓直接讀取壓縮包中文件的實(shí)現(xiàn)方法
- Java IO流學(xué)習(xí)總結(jié)之文件傳輸基礎(chǔ)
- 淺談javap命令拆解字節(jié)碼文件
- JavaWeb實(shí)現(xiàn)文件的上傳與下載
- Java 如何實(shí)現(xiàn)解壓縮文件和文件夾
- Java(TM) Platform SE binary 打開(kāi)jar文件的操作
- java 如何讀取遠(yuǎn)程主機(jī)文件
- java自定義ClassLoader加載指定的class文件操作
- IntelliJ IDEA創(chuàng)建普通的Java 項(xiàng)目及創(chuàng)建 Java 文件并運(yùn)行的教程
- java中Servlet程序下載文件實(shí)例詳解
- Java基礎(chǔ)之文件概述
相關(guān)文章
SpringBoot使用Redis對(duì)用戶IP進(jìn)行接口限流的示例詳解
使用接口限流的主要目的在于提高系統(tǒng)的穩(wěn)定性,防止接口被惡意打擊,這篇文章主要介紹了SpringBoot使用Redis對(duì)用戶IP進(jìn)行接口限流的示例代碼,需要的朋友可以參考下2023-07-07
Kotlin與Java 泛型缺陷和應(yīng)用場(chǎng)景詳解
這篇文章主要為大家介紹了Kotlin與Java 泛型缺陷和應(yīng)用場(chǎng)景詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12
Spring boot自定義http反饋狀態(tài)碼詳解
這篇文章主要給大家介紹了Spring boot自定義http反饋狀態(tài)碼的相關(guān)資料,文中介紹的非常詳細(xì),對(duì)大家具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面跟著小編一起來(lái)學(xué)習(xí)學(xué)習(xí)吧。2017-06-06
SpringBoot?整合?EasyExcel?實(shí)現(xiàn)自由導(dǎo)入導(dǎo)出功能
在實(shí)際的業(yè)務(wù)系統(tǒng)開(kāi)發(fā)過(guò)程中,操作 Excel 實(shí)現(xiàn)數(shù)據(jù)的導(dǎo)入導(dǎo)出基本上是個(gè)非常常見(jiàn)的需求,這篇文章主要介紹了SpringBoot?整合?EasyExcel?實(shí)現(xiàn)自由導(dǎo)入導(dǎo)出功能,需要的朋友可以參考下2024-06-06
Java JTable 實(shí)現(xiàn)日歷的示例
這篇文章主要介紹了Java JTable 實(shí)現(xiàn)日歷的示例,幫助大家更好的理解和學(xué)習(xí)Java jtable的使用方法,感興趣的朋友可以了解下2020-10-10

