Java定時調(diào)用.ktr文件的示例代碼(解決方案)
1.Maven依賴
<!-- Kettle --> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-core</artifactId> <version>7.1.0.0-12</version> </dependency> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-engine</artifactId> <version>7.1.0.0-12</version> </dependency> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>metastore</artifactId> <version>7.1.0.0-12</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>1.4</version> </dependency> <!-- connector --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency>
注意:kettle的jar包依賴會拉不下來,需要將jar包install到本地,命令:
創(chuàng)建 0_install.bat 文件
:: 本地 install kettle-core.jar包 CALL mvn install:install-file -Dfile=kettle-core-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-core -Dversion=7.1.0.0-12 -Dpackaging=jar :: 本地 install kettle-engine.jar包 CALL mvn install:install-file -Dfile=kettle-engine-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-engine -Dversion=7.1.0.0-12 -Dpackaging=jar :: 本地 install metastore.jar包 CALL mvn install:install-file -Dfile=metastore-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=metastore -Dversion=7.1.0.0-12 -Dpackaging=jar pause
或者deploy到內(nèi)網(wǎng)私服上,命令:
創(chuàng)建 1_deploy.bat 文件
:: 私服 deploy kettle-core.jar包 CALL mvn deploy:deploy-file -Dfile=kettle-core-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-core -Dversion=7.1.0.0-12 -Dpackaging=jar -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服務(wù)ID :: 私服 deploy kettle-engine.jar包 CALL mvn deploy:deploy-file -Dfile=kettle-engine-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-engine -Dversion=7.1.0.0-12 -Dpackaging=jar -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服務(wù)ID :: 私服 deploy metastore.jar包 CALL mvn deploy:deploy-file -Dfile=metastore-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=metastore -Dversion=7.1.0.0-12 -Dpackaging=jar -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服務(wù)ID pause
(腳本創(chuàng)建在jar包目錄下,創(chuàng)建好之后雙擊運行即可)
【jar包、腳本文件下載地址】
https://share.weiyun.com/eaOSjqP7
2.執(zhí)行.ktr/.kjb工具類
KettleReadUtils.java
import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.util.EnvUtil; import org.pentaho.di.job.Job; import org.pentaho.di.job.JobMeta; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; import java.io.InputStream; /** * <p> @Title KettleReadUtils * <p> @Description Kettle工具包 * * @author zhj * @date 2021/4/8 10:50 */ public class KettleReadUtils { /** * 調(diào)用 kettle ktr * * @param path 文件路徑 */ public static void runKtr(String path) { try { KettleEnvironment.init(); EnvUtil.environmentInit(); TransMeta transMeta = new TransMeta(path); Trans trans = new Trans(transMeta); trans.execute(null); trans.waitUntilFinished(); if (trans.getErrors() > 0) { throw new Exception("Errors during transformation execution!"); } } catch (Exception e) { e.printStackTrace(); } } /** * 以流的方式調(diào)用 kettle ktr * * @param in 文件流 */ public static void runKtrByStream(InputStream in) { try { KettleEnvironment.init(); TransMeta transMeta = new TransMeta(in, null, true, null, null); Trans trans = new Trans(transMeta); trans.execute(null); trans.waitUntilFinished(); if (trans.getErrors() > 0) { throw new Exception("Errors during transformation execution!"); } } catch (Exception e) { e.printStackTrace(); } } /** * 調(diào)用 kettle job * * @param paraNames 多個參數(shù)名 * @param paraValues 多個參數(shù)值 * @param jobPath 如: String fName= "D:\\kettle\\aaa.kjb"; */ public static void runJob(String[] paraNames, String[] paraValues, String jobPath) { try { KettleEnvironment.init(); JobMeta jobMeta = new JobMeta(jobPath, null); Job job = new Job(null, jobMeta); // 向Job 腳本傳遞參數(shù),腳本中獲取參數(shù)值:${參數(shù)名} if (paraNames != null && paraValues != null) { for (int i = 0; i < paraNames.length && i < paraValues.length; i++) { job.setVariable(paraNames[i], paraValues[i]); } } job.start(); job.waitUntilFinished(); if (job.getErrors() > 0) { throw new Exception("Errors during job execution!"); } } catch (Exception e) { e.printStackTrace(); } } }
3.創(chuàng)建.ktr/.kjb工具類
(此處只是提供java創(chuàng)建途徑,可以直接使用Spoon.bat創(chuàng)建好的文件)
import org.apache.commons.io.FileUtils; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleXMLException; import org.pentaho.di.core.plugins.PluginRegistry; import org.pentaho.di.core.plugins.StepPluginType; import org.pentaho.di.trans.TransHopMeta; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta; import org.pentaho.di.trans.steps.tableinput.TableInputMeta; import java.io.File; /** * <p> @Title KettleReadUtils * <p> @Description Kettle工具包 * * @author zhj * @date 2021/4/8 10:50 */ public class KettleWriteUtils { /** * 數(shù)據(jù)庫連接信息,適用于DatabaseMeta其中 一個構(gòu)造器DatabaseMeta(String xml) */ private static final String DATABASE_XML_1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<connection>" + "<name>db1</name>" + "<server>127.0.0.1</server>" + "<type>MYSQL</type>" + "<access>Native</access>" + "<database>test</database>" + "<port>3306</port>" + "<username>root</username>" + "<password>root</password>" + "</connection>"; private static final String DATABASE_XML_2 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<connection>" + "<name>db2</name>" + "<server>127.0.0.1</server>" + "<type>MYSQL</type>" + "<access>Native</access>" + "<database>test</database>" + "<port>3306</port>" + "<username>root</username>" + "<password>root</password>" + "</connection>"; /** * 創(chuàng)建ktr文件 * * @param args args */ public static void main(String[] args) { try { KettleEnvironment.init(); KettleWriteUtils kettleWriteUtils = new KettleWriteUtils(); TransMeta transMeta = kettleWriteUtils.generateMyOwnTrans(); String transXml = transMeta.getXML(); String transName = "update_insert_Trans.ktr"; File file = new File(transName); FileUtils.writeStringToFile(file, transXml, "UTF-8"); } catch (Exception e) { e.printStackTrace(); return; } } /** * 生成一個轉(zhuǎn)化,把一個數(shù)據(jù)庫中的數(shù)據(jù)轉(zhuǎn)移到另一個數(shù)據(jù)庫中,只有兩個步驟,第一個是表輸入,第二個是表插入與更新操作 * @return 元數(shù)據(jù) * @throws KettleXMLException 生成XML異常 */ private TransMeta generateMyOwnTrans() throws KettleXMLException { TransMeta transMeta = new TransMeta(); //設(shè)置轉(zhuǎn)化的名稱 transMeta.setName("insert_update"); //添加轉(zhuǎn)換的數(shù)據(jù)庫連接 DatabaseMeta databaseMeta1 = new DatabaseMeta(DATABASE_XML_1); transMeta.addDatabase(databaseMeta1); DatabaseMeta databaseMeta2 = new DatabaseMeta(DATABASE_XML_2); transMeta.addDatabase(databaseMeta2); //registry是給每個步驟生成一個標(biāo)識Id用 PluginRegistry registry = PluginRegistry.getInstance(); //第一個表輸入步驟(TableInputMeta) TableInputMeta tableInput = new TableInputMeta(); String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput); //給表輸入添加一個DatabaseMeta連接數(shù)據(jù)庫 DatabaseMeta db1 = transMeta.findDatabase("db1"); tableInput.setDatabaseMeta(db1); String sql = "SELECT USER_ID,USER_NAME FROM t_manager_user"; tableInput.setSQL(sql); //添加TableInputMeta到轉(zhuǎn)換中 StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId, "table input", tableInput); //給步驟添加在spoon工具中的顯示位置 tableInputMetaStep.setDraw(true); tableInputMetaStep.setLocation(100, 100); transMeta.addStep(tableInputMetaStep); //第二個步驟插入與更新 InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta(); String insertUpdateMetaPluginId = registry.getPluginId(StepPluginType.class, insertUpdateMeta); //添加數(shù)據(jù)庫連接 DatabaseMeta db2 = transMeta.findDatabase("db2"); insertUpdateMeta.setDatabaseMeta(db2); //設(shè)置操作的表 insertUpdateMeta.setTableName("t_stat_user_info"); //設(shè)置用來查詢的關(guān)鍵字 insertUpdateMeta.setKeyLookup(new String[]{"USER_ID"}); insertUpdateMeta.setKeyStream(new String[]{"USER_ID"}); insertUpdateMeta.setKeyStream2(new String[]{""}); insertUpdateMeta.setKeyCondition(new String[]{"="}); //設(shè)置要更新的字段 String[] updatelookup = {"USER_ID","USER_NAME"} ; String[] updateStream = {"USER_ID","USER_NAME"} ; Boolean[] updateOrNot = {false,true}; insertUpdateMeta.setUpdateLookup(updatelookup); insertUpdateMeta.setUpdateStream(updateStream); insertUpdateMeta.setUpdate(updateOrNot); //添加步驟到轉(zhuǎn)換中 StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId, "insert_update", insertUpdateMeta); insertUpdateStep.setDraw(true); insertUpdateStep.setLocation(250, 100); transMeta.addStep(insertUpdateStep); //添加hop把兩個步驟關(guān)聯(lián)起來 transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep)); return transMeta; } }
4.測試執(zhí)行.ktr文件
執(zhí)行用例:
public static void main(String[] args) { InputStream inputStream = KettleReadUtils.class.getResourceAsStream("/etl/test.ktr"); runKtrByStream(inputStream); }
.ktr文件位置:
執(zhí)行結(jié)果:
5.Kettle所使用的mysql-connector 5.1.49 和 8 版本不兼容問題
- mysql-connector-java 5.1.49 版本中,支持連接驅(qū)動,
org.gjt.mm.mysql.Driver
; - mysql-connector-java 8.* 版本中,連接驅(qū)動,
com.mysql.cj.jdbc.Driver
; - 如果直接使用 8.* 版本 去連接 MySQL 數(shù)據(jù)庫的話會出現(xiàn)"錯誤連接數(shù)據(jù)庫"問題:
Driver class ‘org.gjt.mm.mysql.Driver' could not be found, make sure the ‘MySQL' driver (jar file) is installed.
org.gjt.mm.mysql.Driver
解決方案:
1.關(guān)閉Kettle;
2.將/data-integration/lib/ 下面的 mysql-connector-java-5.1.49.jar
替換為 mysql-connector-java-8.*.jar
;
3.打開Kettle,修改連接類型為 Generic database ,配置驅(qū)動名稱為 com.mysql.cj.jdbc.Driver;
4.重新導(dǎo)出為.ktr/.kjb文件;
5.再用java調(diào)用即可解決問題。
整理完畢,完結(jié)撒花~
到此這篇關(guān)于Java定時調(diào)用.ktr文件的示例代碼的文章就介紹到這了,更多相關(guān)Java定時調(diào)用.ktr文件內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java實戰(zhàn)之簡單的文件管理器
- java不解壓直接讀取壓縮包中文件的實現(xiàn)方法
- Java IO流學(xué)習(xí)總結(jié)之文件傳輸基礎(chǔ)
- 淺談javap命令拆解字節(jié)碼文件
- JavaWeb實現(xiàn)文件的上傳與下載
- Java 如何實現(xiàn)解壓縮文件和文件夾
- Java(TM) Platform SE binary 打開jar文件的操作
- java 如何讀取遠(yuǎn)程主機(jī)文件
- java自定義ClassLoader加載指定的class文件操作
- IntelliJ IDEA創(chuàng)建普通的Java 項目及創(chuàng)建 Java 文件并運行的教程
- java中Servlet程序下載文件實例詳解
- Java基礎(chǔ)之文件概述
相關(guān)文章
SpringBoot使用Redis對用戶IP進(jìn)行接口限流的示例詳解
使用接口限流的主要目的在于提高系統(tǒng)的穩(wěn)定性,防止接口被惡意打擊,這篇文章主要介紹了SpringBoot使用Redis對用戶IP進(jìn)行接口限流的示例代碼,需要的朋友可以參考下2023-07-07Spring boot自定義http反饋狀態(tài)碼詳解
這篇文章主要給大家介紹了Spring boot自定義http反饋狀態(tài)碼的相關(guān)資料,文中介紹的非常詳細(xì),對大家具有一定的參考學(xué)習(xí)價值,需要的朋友們下面跟著小編一起來學(xué)習(xí)學(xué)習(xí)吧。2017-06-06SpringBoot?整合?EasyExcel?實現(xiàn)自由導(dǎo)入導(dǎo)出功能
在實際的業(yè)務(wù)系統(tǒng)開發(fā)過程中,操作 Excel 實現(xiàn)數(shù)據(jù)的導(dǎo)入導(dǎo)出基本上是個非常常見的需求,這篇文章主要介紹了SpringBoot?整合?EasyExcel?實現(xiàn)自由導(dǎo)入導(dǎo)出功能,需要的朋友可以參考下2024-06-06