SpringBoot整合dataworks的實(shí)現(xiàn)過(guò)程
注意事項(xiàng)
阿里云的dataworks提供了OpenApi, 需要是企業(yè)版或旗艦版才能夠調(diào)用,也就是付費(fèi)項(xiàng)目。
這里測(cè)試主要是調(diào)用拉取dataworks上拉取的腳本,并存儲(chǔ)到本地。
腳本包含兩部分
1、開(kāi)發(fā)的odps腳本(通過(guò)OpenApi獲取)2、建表語(yǔ)句腳本(通過(guò)dataworks信息去連接maxCompute獲取建立語(yǔ)句)
阿里云Dataworks的openApi分頁(yè)查詢限制,一次最多查詢100條。我們拉取腳本需要分多頁(yè)查詢
該項(xiàng)目使用到了MaxCompute的SDK/JDBC方式連接,SpringBoot操作MaxCompute SDK/JDBC連接
整合實(shí)現(xiàn)
實(shí)現(xiàn)主要是編寫(xiě)工具類,如果需要?jiǎng)t可以配置成SpringBean,注入容器即可使用
依賴引入
<properties>
<java.version>1.8</java.version>
<!--maxCompute-sdk-版本號(hào)-->
<max-compute-sdk.version>0.40.8-public</max-compute-sdk.version>
<!--maxCompute-jdbc-版本號(hào)-->
<max-compute-jdbc.version>3.0.1</max-compute-jdbc.version>
<!--dataworks版本號(hào)-->
<dataworks-sdk.version>3.4.2</dataworks-sdk.version>
<aliyun-java-sdk.version>4.5.20</aliyun-java-sdk.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--max compute sdk-->
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>${max-compute-sdk.version}</version>
</dependency>
<!--max compute jdbc-->
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-jdbc</artifactId>
<version>${max-compute-jdbc.version}</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
<!--dataworks需要引入aliyun-sdk和dataworks本身-->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>${aliyun-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-dataworks-public</artifactId>
<version>${dataworks-sdk.version}</version>
</dependency>
</dependencies>
請(qǐng)求參數(shù)類編寫(xiě)
/**
* @Description
* @Author itdl
* @Date 2022/08/09 15:12
*/
@Data
public class DataWorksOpenApiConnParam {
/**
* 區(qū)域 eg. cn-shanghai
*/
private String region;
/**
* 訪問(wèn)keyId
*/
private String aliyunAccessId;
/**
* 密鑰
*/
private String aliyunAccessKey;
/**
* 訪問(wèn)端點(diǎn) 就是API的URL前綴
*/
private String endPoint;
/**
* 數(shù)據(jù)庫(kù)類型 如odps
*/
private String datasourceType;
/**
* 所屬項(xiàng)目
*/
private String project;
/**
* 項(xiàng)目環(huán)境 dev prod
*/
private String projectEnv;
}
工具類編寫(xiě)
基礎(chǔ)類準(zhǔn)備,拉取腳本之后的回調(diào)函數(shù)
為什么需要回調(diào)函數(shù),因?yàn)槔〉氖撬心_本,如果合并每次分頁(yè)結(jié)果的話,會(huì)導(dǎo)致內(nèi)存溢出,而使用回調(diào)函數(shù)只是每次循環(huán)增加處理函數(shù)
/**
* @Description
* @Author itdl
* @Date 2022/08/09 15:12
*/
@Data
public class DataWorksOpenApiConnParam {
/**
* 區(qū)域 eg. cn-shanghai
*/
private String region;
/**
* 訪問(wèn)keyId
*/
private String aliyunAccessId;
/**
* 密鑰
*/
private String aliyunAccessKey;
/**
* 訪問(wèn)端點(diǎn) 就是API的URL前綴
*/
private String endPoint;
/**
* 數(shù)據(jù)庫(kù)類型 如odps
*/
private String datasourceType;
/**
* 所屬項(xiàng)目
*/
private String project;
/**
* 項(xiàng)目環(huán)境 dev prod
*/
private String projectEnv;
}
初始化操作
主要是實(shí)例化dataworks openApi接口的客戶端信息,maxCompute連接的工具類初始化(包括JDBC,SDK方式)
private static final String MAX_COMPUTE_JDBC_URL_FORMAT = "http://service.%s.maxcompute.aliyun.com/api";
/**默認(rèn)的odps接口地址 在Odps中也可以看到該變量*/
private static final String defaultEndpoint = "http://service.odps.aliyun.com/api";
/**
* dataworks連接參數(shù)
*
*/
private final DataWorksOpenApiConnParam connParam;
/**
* 可以使用dataworks去連接maxCompute 如果連接的引擎是maxCompute的話
*/
private final MaxComputeJdbcUtil maxComputeJdbcUtil;
private final MaxComputeSdkUtil maxComputeSdkUtil;
private final boolean odpsSdk;
/**
* 客戶端
*/
private final IAcsClient client;
public DataWorksOpenApiUtil(DataWorksOpenApiConnParam connParam, boolean odpsSdk) {
this.connParam = connParam;
this.client = buildClient();
this.odpsSdk = odpsSdk;
if (odpsSdk){
this.maxComputeJdbcUtil = null;
this.maxComputeSdkUtil = buildMaxComputeSdkUtil();
}else {
this.maxComputeJdbcUtil = buildMaxComputeJdbcUtil();
this.maxComputeSdkUtil = null;
}
}
private MaxComputeSdkUtil buildMaxComputeSdkUtil() {
final MaxComputeSdkConnParam param = new MaxComputeSdkConnParam();
// 設(shè)置賬號(hào)密碼
param.setAliyunAccessId(connParam.getAliyunAccessId());
param.setAliyunAccessKey(connParam.getAliyunAccessKey());
// 設(shè)置endpoint
param.setMaxComputeEndpoint(defaultEndpoint);
// 目前只處理odps的引擎
final String datasourceType = connParam.getDatasourceType();
if (!"odps".equals(datasourceType)){
throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
}
// 獲取項(xiàng)目環(huán)境,根據(jù)項(xiàng)目環(huán)境連接不同的maxCompute
final String projectEnv = connParam.getProjectEnv();
if ("dev".equals(projectEnv)){
// 開(kāi)發(fā)環(huán)境dataworks + _dev就是maxCompute的項(xiàng)目名
param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
}else {
// 生產(chǎn)環(huán)境dataworks的項(xiàng)目名和maxCompute一致
param.setProjectName(connParam.getProject());
}
return new MaxComputeSdkUtil(param);
}
private MaxComputeJdbcUtil buildMaxComputeJdbcUtil() {
final MaxComputeJdbcConnParam param = new MaxComputeJdbcConnParam();
// 設(shè)置賬號(hào)密碼
param.setAliyunAccessId(connParam.getAliyunAccessId());
param.setAliyunAccessKey(connParam.getAliyunAccessKey());
// 設(shè)置endpoint
param.setEndpoint(String.format(MAX_COMPUTE_JDBC_URL_FORMAT, connParam.getRegion()));
// 目前只處理odps的引擎
final String datasourceType = connParam.getDatasourceType();
if (!"odps".equals(datasourceType)){
throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
}
// 獲取項(xiàng)目環(huán)境,根據(jù)項(xiàng)目環(huán)境連接不同的maxCompute
final String projectEnv = connParam.getProjectEnv();
if ("dev".equals(projectEnv)){
// 開(kāi)發(fā)環(huán)境dataworks + _dev就是maxCompute的項(xiàng)目名
param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
}else {
// 生產(chǎn)環(huán)境dataworks的項(xiàng)目名和maxCompute一致
param.setProjectName(connParam.getProject());
}
return new MaxComputeJdbcUtil(param);
}
調(diào)用OpenApi拉取所有腳本
/**
* 根據(jù)文件夾路徑分頁(yè)查詢?cè)撀窂较碌奈募_本)
* @param pageSize 每頁(yè)查詢多少數(shù)據(jù)
* @param folderPath 文件所在目錄
* @param userType 文件所屬功能模塊 可不傳
* @param fileTypes 設(shè)置文件代碼類型 逗號(hào)分割 可不傳
*/
public void listAllFiles(Integer pageSize, String folderPath, String userType, String fileTypes, CallBack.FileCallBack callBack) throws ClientException {
pageSize = setPageSize(pageSize);
// 創(chuàng)建請(qǐng)求
final ListFilesRequest request = new ListFilesRequest();
// 設(shè)置分頁(yè)參數(shù)
request.setPageNumber(1);
request.setPageSize(pageSize);
// 設(shè)置上級(jí)文件夾
request.setFileFolderPath(folderPath);
// 設(shè)置區(qū)域和項(xiàng)目名稱
request.setSysRegionId(connParam.getRegion());
request.setProjectIdentifier(connParam.getProject());
// 設(shè)置文件所屬功能模塊
if (!ObjectUtils.isEmpty(userType)){
request.setUseType(userType);
}
// 設(shè)置文件代碼類型
if (!ObjectUtils.isEmpty(fileTypes)){
request.setFileTypes(fileTypes);
}
// 發(fā)起請(qǐng)求
ListFilesResponse res = client.getAcsResponse(request);
// 獲取分頁(yè)總數(shù)
final Integer totalCount = res.getData().getTotalCount();
// 返回結(jié)果
final List<ListFilesResponse.Data.File> resultList = res.getData().getFiles();
// 計(jì)算能分幾頁(yè)
long pages = totalCount % pageSize == 0 ? (totalCount / pageSize) : (totalCount / pageSize) + 1;
// 只有1頁(yè) 直接返回
if (pages <= 1){
callBack.handle(resultList);
return;
}
// 第一頁(yè)執(zhí)行回調(diào)
callBack.handle(resultList);
// 分頁(yè)數(shù)據(jù) 從第二頁(yè)開(kāi)始查詢 同步拉取,可以優(yōu)化為多線程拉取
for (int i = 2; i <= pages; i++) {
//第1頁(yè)
request.setPageNumber(i);
//每頁(yè)大小
request.setPageSize(pageSize);
// 發(fā)起請(qǐng)求
res = client.getAcsResponse(request);
final List<ListFilesResponse.Data.File> tableEntityList = res.getData().getFiles();
if (!ObjectUtils.isEmpty(tableEntityList)){
// 執(zhí)行回調(diào)函數(shù)
callBack.handle(tableEntityList);
}
}
}
內(nèi)部連接MaxCompute拉取所有DDL腳本內(nèi)容
DataWorks工具類代碼,通過(guò)回調(diào)函數(shù)處理
/**
* 獲取所有的DDL腳本
* @param callBack 回調(diào)處理函數(shù)
*/
public void listAllDdl(CallBack.DdlCallBack callBack){
if (odpsSdk){
final List<TableMetaInfo> tableInfos = maxComputeSdkUtil.getTableInfos();
for (TableMetaInfo tableInfo : tableInfos) {
final String tableName = tableInfo.getTableName();
final String sqlCreateDesc = maxComputeSdkUtil.getSqlCreateDesc(tableName);
callBack.handle(tableName, sqlCreateDesc);
}
}
}
MaxCompute工具類代碼,根據(jù)表名獲取建表語(yǔ)句, 以SDK為例, JDBC直接執(zhí)行show create table即可拿到建表語(yǔ)句
/**
* 根據(jù)表名獲取建表語(yǔ)句
* @param tableName 表名
* @return
*/
public String getSqlCreateDesc(String tableName) {
final Table table = odps.tables().get(tableName);
// 建表語(yǔ)句
StringBuilder mssqlDDL = new StringBuilder();
// 獲取表結(jié)構(gòu)
TableSchema tableSchema = table.getSchema();
// 獲取表名表注釋
String tableComment = table.getComment();
//獲取列名列注釋
List<Column> columns = tableSchema.getColumns();
/*組裝成mssql的DDL*/
// 表名
mssqlDDL.append("CREATE TABLE IF NOT EXISTS ");
mssqlDDL.append(tableName).append("\n");
mssqlDDL.append(" (\n");
//列字段
int index = 1;
for (Column column : columns) {
mssqlDDL.append(" ").append(column.getName()).append("\t\t").append(column.getTypeInfo().getTypeName());
if (!ObjectUtils.isEmpty(column.getComment())) {
mssqlDDL.append(" COMMENT '").append(column.getComment()).append("'");
}
if (index == columns.size()) {
mssqlDDL.append("\n");
} else {
mssqlDDL.append(",\n");
}
index++;
}
mssqlDDL.append(" )\n");
//獲取分區(qū)
List<Column> partitionColumns = tableSchema.getPartitionColumns();
int partitionIndex = 1;
if (!ObjectUtils.isEmpty(partitionColumns)) {
mssqlDDL.append("PARTITIONED BY (");
}
for (Column partitionColumn : partitionColumns) {
final String format = String.format("%s %s COMMENT '%s'", partitionColumn.getName(), partitionColumn.getTypeInfo().getTypeName(), partitionColumn.getComment());
mssqlDDL.append(format);
if (partitionIndex == partitionColumns.size()) {
mssqlDDL.append("\n");
} else {
mssqlDDL.append(",\n");
}
partitionIndex++;
}
if (!ObjectUtils.isEmpty(partitionColumns)) {
mssqlDDL.append(")\n");
}
// mssqlDDL.append("STORED AS ALIORC \n");
// mssqlDDL.append("TBLPROPERTIES ('comment'='").append(tableComment).append("');");
mssqlDDL.append(";");
return mssqlDDL.toString();
}
測(cè)試代碼
public static void main(String[] args) throws ClientException {
final DataWorksOpenApiConnParam connParam = new DataWorksOpenApiConnParam();
connParam.setAliyunAccessId("您的阿里云賬號(hào)accessId");
connParam.setAliyunAccessKey("您的阿里云賬號(hào)accessKey");
// dataworks所在區(qū)域
connParam.setRegion("cn-chengdu");
// dataworks所屬項(xiàng)目
connParam.setProject("dataworks所屬項(xiàng)目");
// dataworks所屬項(xiàng)目環(huán)境 如果不分環(huán)境的話設(shè)置為生產(chǎn)即可
connParam.setProjectEnv("dev");
// 數(shù)據(jù)引擎類型 odps
connParam.setDatasourceType("odps");
// ddataworks接口地址
connParam.setEndPoint("dataworks.cn-chengdu.aliyuncs.com");
final DataWorksOpenApiUtil dataWorksOpenApiUtil = new DataWorksOpenApiUtil(connParam, true);
// 拉取所有ODPS腳本
dataWorksOpenApiUtil.listAllFiles(100, "", "", "10", files -> {
// 處理文件
for (ListFilesResponse.Data.File file : files) {
final String fileName = file.getFileName();
System.out.println(fileName);
}
});
// 拉取所有表的建表語(yǔ)句
dataWorksOpenApiUtil.listAllDdl((tableName, tableDdlContent) -> {
System.out.println("=======================================");
System.out.println("表名:" + tableName + "內(nèi)容如下:\n");
System.out.println(tableDdlContent);
System.out.println("=======================================");
});
}
測(cè)試結(jié)果
test_001腳本
test_002腳本
test_003腳本
test_004腳本
test_005腳本
=======================================
表名:test_abc_info內(nèi)容如下:CREATE TABLE IF NOT EXISTS test_abc_info
(
test_abc1 STRING COMMENT '字段1',
test_abc2 STRING COMMENT '字段2',
test_abc3 STRING COMMENT '字段3',
test_abc4 STRING COMMENT '字段4',
test_abc5 STRING COMMENT '字段5',
test_abc6 STRING COMMENT '字段6',
test_abc7 STRING COMMENT '字段7',
test_abc8 STRING COMMENT '字段8'
)
PARTITIONED BY (p_date STRING COMMENT '數(shù)據(jù)日期'
)
;
=======================================
Disconnected from the target VM, address: '127.0.0.1:59509', transport: 'socket'
項(xiàng)目地址
https://github.com/HedongLin123/dataworks_odps_demo
到此這篇關(guān)于SpringBoot整合dataworks的實(shí)現(xiàn)過(guò)程的文章就介紹到這了,更多相關(guān)SpringBoot整合dataworks內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring事件監(jiān)聽(tīng)源碼解析流程分析
spring事件監(jiān)聽(tīng)機(jī)制離不開(kāi)容器IOC特性提供的支持,比如容器會(huì)自動(dòng)創(chuàng)建事件發(fā)布器,自動(dòng)識(shí)別用戶注冊(cè)的監(jiān)聽(tīng)器并進(jìn)行管理,在特定的事件發(fā)布后會(huì)找到對(duì)應(yīng)的事件監(jiān)聽(tīng)器并對(duì)其監(jiān)聽(tīng)方法進(jìn)行回調(diào),這篇文章主要介紹了Spring事件監(jiān)聽(tīng)源碼解析,需要的朋友可以參考下2023-08-08
springboot實(shí)現(xiàn)極驗(yàn)校驗(yàn)的項(xiàng)目實(shí)踐
在系統(tǒng)業(yè)務(wù)中,需要想客戶發(fā)送手機(jī)驗(yàn)證碼,進(jìn)行驗(yàn)證后,才能提交,本文主要介紹了springboot實(shí)現(xiàn)極驗(yàn)校驗(yàn)的項(xiàng)目實(shí)踐,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09
SpringBoot從yml配置文件中讀常用參數(shù)值實(shí)例方法
在本篇文章里小編給大家整理了關(guān)于SpringBoot從yml配置文件中讀常用參數(shù)值實(shí)例方法,有需要的朋友們學(xué)習(xí)下。2019-12-12
Spring容器刷新obtainFreshBeanFactory示例詳解
這篇文章主要為大家介紹了Spring容器刷新obtainFreshBeanFactory示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03
Base64加解密的實(shí)現(xiàn)方式實(shí)例詳解
這篇文章主要介紹了Base64加解密的實(shí)現(xiàn)方式實(shí)例詳解的相關(guān)資料,這里提供了實(shí)現(xiàn)實(shí)例,幫助大家學(xué)習(xí)理解這部分內(nèi)容,需要的朋友可以參考下2017-08-08
java實(shí)現(xiàn)的小時(shí)鐘示例分享
這篇文章主要介紹了java實(shí)現(xiàn)的小時(shí)鐘示例,需要的朋友可以參考下2014-02-02
Java使用OTP動(dòng)態(tài)口令(每分鐘變一次)進(jìn)行登錄認(rèn)證
這篇文章主要介紹了Java使用OTP動(dòng)態(tài)口令(每分鐘變一次)進(jìn)行登錄認(rèn)證,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09

