SpringBoot 集成 Kettle的實現(xiàn)示例
Kettle 簡介
Kettle 最初由 Matt Casters 開發(fā),是 Pentaho 數(shù)據(jù)集成平臺的一部分。它提供了一個用戶友好的界面和豐富的功能集,使用戶能夠輕松地設(shè)計、執(zhí)行和監(jiān)控 ETL 任務(wù)。Kettle 通過其強大的功能和靈活性,幫助企業(yè)高效地處理大規(guī)模數(shù)據(jù)集成任務(wù)。
主要組成部分
- Spoon:
- 用途:Spoon 是 Kettle 的圖形化設(shè)計工具。用戶可以使用 Spoon 設(shè)計和調(diào)試 ETL 轉(zhuǎn)換和作業(yè)。
- 功能:拖放式界面、預(yù)覽數(shù)據(jù)、測試 ETL 流程、管理連接、編寫腳本等。
- Pan:
- 用途:Pan 是一個命令行工具,用于執(zhí)行由 Spoon 設(shè)計的 ETL 轉(zhuǎn)換。
- 功能:通過命令行執(zhí)行轉(zhuǎn)換、調(diào)度作業(yè)、集成到其他自動化流程中。
- Kitchen:
- 用途:Kitchen 是一個命令行工具,用于執(zhí)行由 Spoon 設(shè)計的 ETL 作業(yè)。
- 功能:通過命令行執(zhí)行作業(yè)、調(diào)度作業(yè)、集成到其他自動化流程中。
- Carte:
- 用途:Carte 是一個輕量級的 Web 服務(wù)器,提供遠程執(zhí)行和監(jiān)控功能。
- 功能:遠程執(zhí)行和監(jiān)控 ETL 轉(zhuǎn)換和作業(yè)、查看日志、管理集群等。
- Repositories:
- 用途:存儲和管理 ETL 轉(zhuǎn)換和作業(yè)的地方。
- 功能:可以使用數(shù)據(jù)庫或文件系統(tǒng)作為存儲庫,支持版本控制和共享。
主要功能和特點
數(shù)據(jù)提取:
- 支持多種數(shù)據(jù)源,如關(guān)系數(shù)據(jù)庫、文件(CSV、Excel、XML 等)、大數(shù)據(jù)平臺(Hadoop、Hive 等)、云存儲(Amazon S3、Google Drive 等)、Web 服務(wù)和 API 等。
數(shù)據(jù)轉(zhuǎn)換:
- 豐富的轉(zhuǎn)換步驟,包括數(shù)據(jù)清洗、數(shù)據(jù)聚合、數(shù)據(jù)過濾、數(shù)據(jù)排序、數(shù)據(jù)連接、數(shù)據(jù)拆分、數(shù)據(jù)類型轉(zhuǎn)換等。
數(shù)據(jù)加載:
- 支持將數(shù)據(jù)加載到多種目標(biāo)系統(tǒng)中,如關(guān)系數(shù)據(jù)庫、大數(shù)據(jù)平臺、文件系統(tǒng)、云存儲等。
調(diào)度和自動化:
- 支持通過命令行工具(Pan 和 Kitchen)和調(diào)度器(如 cron 或 Windows 任務(wù)計劃)進行調(diào)度和自動化執(zhí)行。
擴展性:
- 提供了插件機制,用戶可以編寫自定義插件,擴展 Kettle 的功能。
- 支持 JavaScript 和 Java 進行腳本編寫,增強轉(zhuǎn)換和作業(yè)的靈活性。
集群和并行處理:
- 支持集群模式,能夠在分布式環(huán)境中并行處理大規(guī)模數(shù)據(jù)。
- 提供了分布式 ETL 執(zhí)行和負載均衡功能。
數(shù)據(jù)質(zhì)量和數(shù)據(jù)治理:
- 提供了數(shù)據(jù)驗證、數(shù)據(jù)一致性檢查和數(shù)據(jù)校驗功能,幫助確保數(shù)據(jù)的質(zhì)量和一致性。
實時數(shù)據(jù)處理:
- 支持實時數(shù)據(jù)流處理,通過集成 Kafka、MQTT 等流處理平臺,實現(xiàn)實時數(shù)據(jù)的提取、轉(zhuǎn)換和加載。
集成 Kettle
將 Kettle(Pentaho Data Integration, PDI)集成到 Spring Boot 項目中,可以實現(xiàn) ETL 流程的自動化和集成化處理。以下是詳細的集成過程:
準(zhǔn)備工作
- 下載 Kettle:從 Pentaho 官網(wǎng)下載 Kettle(PDI)的最新版本,并解壓到本地目錄。
- Spring Boot 項目:確保已有一個 Spring Boot 項目,或新建一個 Spring Boot 項目。
引入 Kettle 依賴
在 Spring Boot 項目的 pom.xml
文件中添加 Kettle 所需的依賴。你可以將 Kettle 的 JAR 文件添加到本地 Maven 倉庫,或直接在項目中引入這些 JAR 文件。
<dependencies> <!-- Spring Boot 依賴 --> <!-- Kettle 依賴 --> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-core</artifactId> <version>9.4.0.0-343</version> </dependency> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-engine</artifactId> <version>9.4.0.0-343</version> </dependency> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-dbdialog</artifactId> <version>9.4.0.0-343</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-vfs2</artifactId> <version>2.7.0</version> </dependency> <!-- 根據(jù)需要添加其他 Kettle 依賴 --> <!-- 操作數(shù)據(jù)庫數(shù)據(jù)時添加相應(yīng)的數(shù)據(jù)庫依賴 --> </dependencies>
處理密碼加密
在 resources
目錄下創(chuàng)建 kettle-password-encoder-plugins.xml
文件,用于配置密碼加密插件:
<password-encoder-plugins> <password-encoder-plugin id="Kettle"> <description>Kettle Password Encoder</description> <classname>org.pentaho.support.encryption.KettleTwoWayPasswordEncoder</classname> </password-encoder-plugin> </password-encoder-plugins>
kettle-core依賴中org.pentaho.support.encryption.KettleTwoWayPasswordEncoder類實現(xiàn)了TwoWayPasswordEncoderInterface接口,用于處理密碼的加密和解密操作。
添加 Spoon 的任務(wù)文件
在 Kettle(Pentaho Data Integration,PDI)中,作業(yè)(Job)和轉(zhuǎn)換(Transformation)是兩種核心的 ETL 組件,它們在設(shè)計和功能上有著本質(zhì)的區(qū)別。
轉(zhuǎn)換(Transformation)
- 數(shù)據(jù)處理流程:轉(zhuǎn)換是一個數(shù)據(jù)處理流程,專注于數(shù)據(jù)的提取(Extract)、轉(zhuǎn)換(Transform)和加載(Load)。
- 行級處理:轉(zhuǎn)換以行級處理數(shù)據(jù),每次處理一行數(shù)據(jù),并將其傳遞給下一步驟。
- 任務(wù)文件為.ktr文件。
作業(yè)(Job)
- 任務(wù)管理和控制流程:作業(yè)是一個任務(wù)管理和控制流程,負責(zé)調(diào)度和控制一系列任務(wù)的執(zhí)行順序。
- 步驟級處理:作業(yè)以步驟為單位處理任務(wù),每次執(zhí)行一個步驟,然后根據(jù)條件決定執(zhí)行下一個步驟。
- 任務(wù)文件為.kjb文件。
區(qū)別
- 轉(zhuǎn)換處理數(shù)據(jù)行,作業(yè)處理任務(wù)步驟。
- 轉(zhuǎn)換中的步驟是并行執(zhí)行的,而作業(yè)中的步驟是順序執(zhí)行的。
- 轉(zhuǎn)換側(cè)重于數(shù)據(jù)的處理和轉(zhuǎn)換,作業(yè)側(cè)重于任務(wù)的調(diào)度和管理。
- 轉(zhuǎn)換主要通過數(shù)據(jù)流控制,作業(yè)提供了豐富的邏輯控制(條件判斷、循環(huán)、錯誤處理等)。
- 轉(zhuǎn)換適用于復(fù)雜的數(shù)據(jù)處理流程,作業(yè)適用于任務(wù)調(diào)度和控制。
在 Spring Boot 項目的 resources
目錄下,創(chuàng)建一個 kettle
目錄,并將 Kettle 的任務(wù)文件(如 轉(zhuǎn)換1.ktr
)復(fù)制到該目錄中。
編寫 Kettle 服務(wù)類
創(chuàng)建一個服務(wù)類,用于執(zhí)行 Kettle 轉(zhuǎn)換或作業(yè)。
package com.example.kettletest.service.impl; import com.example.kettletest.service.KettleJobService; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleXMLException; import org.pentaho.di.core.util.EnvUtil; import org.pentaho.di.job.Job; import org.pentaho.di.job.JobMeta; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; import org.springframework.core.io.ClassPathResource; import org.springframework.stereotype.Service; import java.io.File; import java.io.IOException; /** * @author 羅森 * @date 2024/6/6 13:21 */ @Service public class KettleJobServiceImpl implements KettleJobService { @Override public void runTaskFile(String taskFileName) { // 初始化 Kettle 環(huán)境 try { KettleEnvironment.init(); EnvUtil.environmentInit(); } catch (KettleException e) { throw new RuntimeException(e); } // 執(zhí)行任務(wù)文件 if (taskFileName.endsWith(".ktr")) { taskFileKTR(taskFileName); } else if (taskFileName.endsWith(".kjb")) { taskFileKJB(taskFileName); } else { throw new IllegalArgumentException("Unsupported file type: " + taskFileName); } } /** * 針對kjb文件的操作 * @param taskFileName */ public void taskFileKJB(String taskFileName) { try { // 獲取資源文件路徑 ClassPathResource resource = new ClassPathResource("kettle/" + taskFileName); File jobFile = resource.getFile(); // 加載 KJB 文件 JobMeta jobMeta = new JobMeta(jobFile.getAbsolutePath(), null); // 創(chuàng)建作業(yè)對象 Job job = new Job(null, jobMeta); // 啟動作業(yè) job.start(); // 等待作業(yè)完成 job.waitUntilFinished(); if (job.getErrors() > 0) { System.out.println("There were errors during job execution."); } else { System.out.println("Job executed successfully."); } } catch (IOException | KettleXMLException e) { e.printStackTrace(); } } /** * 針對ktr文件的操作 * @param taskFileName */ public void taskFileKTR(String taskFileName) { try { // 獲取資源文件路徑 ClassPathResource resource = new ClassPathResource("kettle/" + taskFileName); File transFile = resource.getFile(); // 加載 KTR 文件 TransMeta transMeta = new TransMeta(transFile.getAbsolutePath()); // 創(chuàng)建轉(zhuǎn)換對象 Trans trans = new Trans(transMeta); // 啟動作業(yè) trans.execute(null); // 等待作業(yè)完成 trans.waitUntilFinished(); if (trans.getErrors() > 0) { System.err.println("There were errors during Transformation execution."); } else { System.out.println("Transformation executed successfully!"); } } catch (IOException | KettleException e) { e.printStackTrace(); } } }
常見問題解決辦法
運行后報錯信息為:
Unable to find plugin with ID 'Kettle'. If this is a test, make sure kettle-core tests jar is a dependency. If this is live make sure a kettle-password-encoder-plugins.xml exits in the classpath.
**解決辦法:**在
resources
目錄下創(chuàng)建kettle-password-encoder-plugins.xml
文件。運行后報錯信息為:
ERROR (version 9.4.0.0-343, build 0.0 from 2022-11-08 07.50.27 by buildguy) : A serious error occurred during job execution: 無法找到作業(yè)的開始點.
**解決辦法:**為Spoon制作的作業(yè)任務(wù)增加開始節(jié)點。
運行后報錯信息為:
Can't run transformation due to plugin missing.
**解決辦法:**此問題通常出現(xiàn)在涉及類似于導(dǎo)出excel文件、json文件時。在初始化 Kettle 環(huán)境之前指明相關(guān)插件的絕對路徑(相關(guān)插件通常在Kettle本地解壓文件夾中的plugins目錄下),新增以下代碼:
StepPluginType.getInstance().getPluginFolders().add(new PluginFolder("E:\\Kettle\\pdi-ce-9.4.0.0-343\\data-integration\\plugins", false, true));
將代碼中的地址換成您本地的絕對地址。
到此這篇關(guān)于SpringBoot 集成 Kettle的實現(xiàn)示例的文章就介紹到這了,更多相關(guān)SpringBoot 集成 Kettle內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring框架JavaMailSender發(fā)送郵件工具類詳解
這篇文章主要為大家詳細介紹了Spring框架JavaMailSender發(fā)送郵件工具類,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-04-04Java泛型實現(xiàn)類型安全的通用類型轉(zhuǎn)換器
在開發(fā)中,我們常常需要在不同類型之間進行轉(zhuǎn)換,為了提高代碼的可讀性與安全性,Java的泛型機制提供了強大的類型檢查能力,下面我們就來看看如何通過泛型實現(xiàn)類型安全的通用轉(zhuǎn)換器2024-11-11原理分析SonarQube中IdentityProvider賬戶互斥現(xiàn)象
這篇文章主要為大家介紹分析SonarQube中IdentityProvider賬戶互斥現(xiàn)象原理,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步2022-02-02SpringBoot 圖書管理系統(tǒng)(刪除、強制登錄、更新圖書)詳細代碼
在企業(yè)開發(fā)中,通常不采用delete語句進行物理刪除,而是使用邏輯刪除,邏輯刪除通過修改標(biāo)識字段來表示數(shù)據(jù)已被刪除,方便數(shù)據(jù)恢復(fù),本文給大家介紹SpringBoot 圖書管理系統(tǒng)實例代碼,感興趣的朋友跟隨小編一起看看吧2024-09-09spring?cloud?gateway限流常見算法實現(xiàn)
本文主要介紹了spring?cloud?gateway限流常見算法實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-02-02