欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合dataworks的實(shí)現(xiàn)過程

 更新時間:2022年08月12日 11:55:59   作者:IT_DLin  
這篇文章主要介紹了SpringBoot整合dataworks的實(shí)現(xiàn)過程,實(shí)現(xiàn)主要是編寫工具類,如果需要則可以配置成SpringBean,注入容器即可使用,需要的朋友可以參考下

注意事項(xiàng)

阿里云的dataworks提供了OpenApi, 需要是企業(yè)版或旗艦版才能夠調(diào)用,也就是付費(fèi)項(xiàng)目。

這里測試主要是調(diào)用拉取dataworks上拉取的腳本,并存儲到本地。
腳本包含兩部分

1、開發(fā)的odps腳本(通過OpenApi獲取)2、建表語句腳本(通過dataworks信息去連接maxCompute獲取建立語句)

阿里云Dataworks的openApi分頁查詢限制,一次最多查詢100條。我們拉取腳本需要分多頁查詢

該項(xiàng)目使用到了MaxCompute的SDK/JDBC方式連接,SpringBoot操作MaxCompute SDK/JDBC連接

整合實(shí)現(xiàn)

實(shí)現(xiàn)主要是編寫工具類,如果需要則可以配置成SpringBean,注入容器即可使用

依賴引入

<properties>
    <java.version>1.8</java.version>
    <!--maxCompute-sdk-版本號-->
    <max-compute-sdk.version>0.40.8-public</max-compute-sdk.version>
    <!--maxCompute-jdbc-版本號-->
    <max-compute-jdbc.version>3.0.1</max-compute-jdbc.version>
    <!--dataworks版本號-->
    <dataworks-sdk.version>3.4.2</dataworks-sdk.version>
    <aliyun-java-sdk.version>4.5.20</aliyun-java-sdk.version>
</properties>
<dependencies>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<!--max compute sdk-->
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-core</artifactId>
    <version>${max-compute-sdk.version}</version>
</dependency>
<!--max compute jdbc-->
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-jdbc</artifactId>
    <version>${max-compute-jdbc.version}</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>
<!--dataworks需要引入aliyun-sdk和dataworks本身-->
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-core</artifactId>
    <version>${aliyun-java-sdk.version}</version>
</dependency>
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
    <version>${dataworks-sdk.version}</version>
</dependency>
</dependencies>

請求參數(shù)類編寫

/**
 * @Description
 * @Author itdl
 * @Date 2022/08/09 15:12
 */
@Data
public class DataWorksOpenApiConnParam {
    /**
     * 區(qū)域 eg. cn-shanghai
     */
    private String region;

    /**
     * 訪問keyId
     */
    private String aliyunAccessId;
    /**
     * 密鑰
     */
    private String aliyunAccessKey;

    /**
     * 訪問端點(diǎn)  就是API的URL前綴
     */
    private String endPoint;

    /**
     * 數(shù)據(jù)庫類型 如odps
     */
    private String datasourceType;

    /**
     * 所屬項(xiàng)目
     */
    private String project;

    /**
     * 項(xiàng)目環(huán)境 dev  prod
     */
    private String projectEnv;
}

工具類編寫

基礎(chǔ)類準(zhǔn)備,拉取腳本之后的回調(diào)函數(shù)

為什么需要回調(diào)函數(shù),因?yàn)槔〉氖撬心_本,如果合并每次分頁結(jié)果的話,會導(dǎo)致內(nèi)存溢出,而使用回調(diào)函數(shù)只是每次循環(huán)增加處理函數(shù)

/**
 * @Description
 * @Author itdl
 * @Date 2022/08/09 15:12
 */
@Data
public class DataWorksOpenApiConnParam {
    /**
     * 區(qū)域 eg. cn-shanghai
     */
    private String region;

    /**
     * 訪問keyId
     */
    private String aliyunAccessId;
    /**
     * 密鑰
     */
    private String aliyunAccessKey;

    /**
     * 訪問端點(diǎn)  就是API的URL前綴
     */
    private String endPoint;

    /**
     * 數(shù)據(jù)庫類型 如odps
     */
    private String datasourceType;

    /**
     * 所屬項(xiàng)目
     */
    private String project;

    /**
     * 項(xiàng)目環(huán)境 dev  prod
     */
    private String projectEnv;
}

初始化操作

主要是實(shí)例化dataworks openApi接口的客戶端信息,maxCompute連接的工具類初始化(包括JDBC,SDK方式)

private static final String MAX_COMPUTE_JDBC_URL_FORMAT = "http://service.%s.maxcompute.aliyun.com/api";
/**默認(rèn)的odps接口地址 在Odps中也可以看到該變量*/
private static final String defaultEndpoint = "http://service.odps.aliyun.com/api";
/**
 * dataworks連接參數(shù)
 *
 */
private final DataWorksOpenApiConnParam connParam;

/**
 * 可以使用dataworks去連接maxCompute 如果連接的引擎是maxCompute的話
 */
private final MaxComputeJdbcUtil maxComputeJdbcUtil;

private final MaxComputeSdkUtil maxComputeSdkUtil;

private final boolean odpsSdk;


/**
 * 客戶端
 */
private final IAcsClient client;

public DataWorksOpenApiUtil(DataWorksOpenApiConnParam connParam, boolean odpsSdk) {
    this.connParam = connParam;
    this.client = buildClient();
    this.odpsSdk = odpsSdk;
    if (odpsSdk){
        this.maxComputeJdbcUtil = null;
        this.maxComputeSdkUtil = buildMaxComputeSdkUtil();
    }else {
        this.maxComputeJdbcUtil = buildMaxComputeJdbcUtil();
        this.maxComputeSdkUtil = null;
    }
}

private MaxComputeSdkUtil buildMaxComputeSdkUtil() {
    final MaxComputeSdkConnParam param = new MaxComputeSdkConnParam();

    // 設(shè)置賬號密碼
    param.setAliyunAccessId(connParam.getAliyunAccessId());
    param.setAliyunAccessKey(connParam.getAliyunAccessKey());

    // 設(shè)置endpoint
    param.setMaxComputeEndpoint(defaultEndpoint);

    // 目前只處理odps的引擎
    final String datasourceType = connParam.getDatasourceType();
    if (!"odps".equals(datasourceType)){
        throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
    }

    // 獲取項(xiàng)目環(huán)境,根據(jù)項(xiàng)目環(huán)境連接不同的maxCompute
    final String projectEnv = connParam.getProjectEnv();

    if ("dev".equals(projectEnv)){
        // 開發(fā)環(huán)境dataworks + _dev就是maxCompute的項(xiàng)目名
        param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
    }else {
        // 生產(chǎn)環(huán)境dataworks的項(xiàng)目名和maxCompute一致
        param.setProjectName(connParam.getProject());
    }

    return new MaxComputeSdkUtil(param);
}

private MaxComputeJdbcUtil buildMaxComputeJdbcUtil() {
    final MaxComputeJdbcConnParam param = new MaxComputeJdbcConnParam();

    // 設(shè)置賬號密碼
    param.setAliyunAccessId(connParam.getAliyunAccessId());
    param.setAliyunAccessKey(connParam.getAliyunAccessKey());

    // 設(shè)置endpoint
    param.setEndpoint(String.format(MAX_COMPUTE_JDBC_URL_FORMAT, connParam.getRegion()));

    // 目前只處理odps的引擎
    final String datasourceType = connParam.getDatasourceType();
    if (!"odps".equals(datasourceType)){
        throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
    }

    // 獲取項(xiàng)目環(huán)境,根據(jù)項(xiàng)目環(huán)境連接不同的maxCompute
    final String projectEnv = connParam.getProjectEnv();

    if ("dev".equals(projectEnv)){
        // 開發(fā)環(huán)境dataworks + _dev就是maxCompute的項(xiàng)目名
        param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
    }else {
        // 生產(chǎn)環(huán)境dataworks的項(xiàng)目名和maxCompute一致
        param.setProjectName(connParam.getProject());
    }

    return new MaxComputeJdbcUtil(param);
}

調(diào)用OpenApi拉取所有腳本

/**
 * 根據(jù)文件夾路徑分頁查詢該路徑下的文件(腳本)
 * @param pageSize 每頁查詢多少數(shù)據(jù)
 * @param folderPath 文件所在目錄
 * @param userType 文件所屬功能模塊 可不傳
 * @param fileTypes 設(shè)置文件代碼類型 逗號分割 可不傳
 */
public void listAllFiles(Integer pageSize, String folderPath, String userType, String fileTypes, CallBack.FileCallBack callBack) throws ClientException {
    pageSize = setPageSize(pageSize);
    // 創(chuàng)建請求
    final ListFilesRequest request = new ListFilesRequest();

    // 設(shè)置分頁參數(shù)
    request.setPageNumber(1);
    request.setPageSize(pageSize);

    // 設(shè)置上級文件夾
    request.setFileFolderPath(folderPath);

    // 設(shè)置區(qū)域和項(xiàng)目名稱
    request.setSysRegionId(connParam.getRegion());
    request.setProjectIdentifier(connParam.getProject());

    // 設(shè)置文件所屬功能模塊
    if (!ObjectUtils.isEmpty(userType)){
        request.setUseType(userType);
    }
    // 設(shè)置文件代碼類型
    if (!ObjectUtils.isEmpty(fileTypes)){
        request.setFileTypes(fileTypes);
    }

    // 發(fā)起請求
    ListFilesResponse res = client.getAcsResponse(request);

    // 獲取分頁總數(shù)
    final Integer totalCount = res.getData().getTotalCount();
    // 返回結(jié)果
    final List<ListFilesResponse.Data.File> resultList = res.getData().getFiles();
    // 計(jì)算能分幾頁
    long pages = totalCount % pageSize == 0 ? (totalCount / pageSize) : (totalCount / pageSize) + 1;
    // 只有1頁 直接返回
    if (pages <= 1){
        callBack.handle(resultList);
        return;
    }

    // 第一頁執(zhí)行回調(diào)
    callBack.handle(resultList);

    // 分頁數(shù)據(jù) 從第二頁開始查詢 同步拉取,可以優(yōu)化為多線程拉取
    for (int i = 2; i <= pages; i++) {
        //第1頁
        request.setPageNumber(i);
        //每頁大小
        request.setPageSize(pageSize);
        // 發(fā)起請求
        res = client.getAcsResponse(request);
        final List<ListFilesResponse.Data.File> tableEntityList = res.getData().getFiles();
        if (!ObjectUtils.isEmpty(tableEntityList)){
            // 執(zhí)行回調(diào)函數(shù)
            callBack.handle(tableEntityList);
        }
    }
}

內(nèi)部連接MaxCompute拉取所有DDL腳本內(nèi)容

DataWorks工具類代碼,通過回調(diào)函數(shù)處理

    /**
     * 獲取所有的DDL腳本
     * @param callBack 回調(diào)處理函數(shù)
     */
    public void listAllDdl(CallBack.DdlCallBack callBack){
        if (odpsSdk){
            final List<TableMetaInfo> tableInfos = maxComputeSdkUtil.getTableInfos();
            for (TableMetaInfo tableInfo : tableInfos) {
                final String tableName = tableInfo.getTableName();
                final String sqlCreateDesc = maxComputeSdkUtil.getSqlCreateDesc(tableName);
                callBack.handle(tableName, sqlCreateDesc);
            }
        }
    }

MaxCompute工具類代碼,根據(jù)表名獲取建表語句, 以SDK為例, JDBC直接執(zhí)行show create table即可拿到建表語句

/**
 * 根據(jù)表名獲取建表語句
 * @param tableName 表名
 * @return
 */
public String getSqlCreateDesc(String tableName) {
    final Table table = odps.tables().get(tableName);
    // 建表語句
    StringBuilder mssqlDDL = new StringBuilder();

    // 獲取表結(jié)構(gòu)
    TableSchema tableSchema = table.getSchema();
    // 獲取表名表注釋
    String tableComment = table.getComment();

    //獲取列名列注釋
    List<Column> columns = tableSchema.getColumns();
    /*組裝成mssql的DDL*/
    // 表名
    mssqlDDL.append("CREATE TABLE IF NOT EXISTS ");
    mssqlDDL.append(tableName).append("\n");
    mssqlDDL.append(" (\n");
    //列字段
    int index = 1;
    for (Column column : columns) {
        mssqlDDL.append("  ").append(column.getName()).append("\t\t").append(column.getTypeInfo().getTypeName());
        if (!ObjectUtils.isEmpty(column.getComment())) {
            mssqlDDL.append(" COMMENT '").append(column.getComment()).append("'");
        }
        if (index == columns.size()) {
            mssqlDDL.append("\n");
        } else {
            mssqlDDL.append(",\n");
        }
        index++;
    }
    mssqlDDL.append(" )\n");
    //獲取分區(qū)
    List<Column> partitionColumns = tableSchema.getPartitionColumns();
    int partitionIndex = 1;
    if (!ObjectUtils.isEmpty(partitionColumns)) {
        mssqlDDL.append("PARTITIONED BY (");
    }
    for (Column partitionColumn : partitionColumns) {
        final String format = String.format("%s %s COMMENT '%s'", partitionColumn.getName(), partitionColumn.getTypeInfo().getTypeName(), partitionColumn.getComment());
        mssqlDDL.append(format);
        if (partitionIndex == partitionColumns.size()) {
            mssqlDDL.append("\n");
        } else {
            mssqlDDL.append(",\n");
        }
        partitionIndex++;
    }

    if (!ObjectUtils.isEmpty(partitionColumns)) {
        mssqlDDL.append(")\n");
    }
//        mssqlDDL.append("STORED AS ALIORC  \n");
//        mssqlDDL.append("TBLPROPERTIES ('comment'='").append(tableComment).append("');");
    mssqlDDL.append(";");
    return mssqlDDL.toString();
}

測試代碼

public static void main(String[] args) throws ClientException {
    final DataWorksOpenApiConnParam connParam = new DataWorksOpenApiConnParam();
    connParam.setAliyunAccessId("您的阿里云賬號accessId");
    connParam.setAliyunAccessKey("您的阿里云賬號accessKey");
    // dataworks所在區(qū)域
    connParam.setRegion("cn-chengdu");
    // dataworks所屬項(xiàng)目
    connParam.setProject("dataworks所屬項(xiàng)目");
    // dataworks所屬項(xiàng)目環(huán)境 如果不分環(huán)境的話設(shè)置為生產(chǎn)即可
    connParam.setProjectEnv("dev");
    // 數(shù)據(jù)引擎類型 odps
    connParam.setDatasourceType("odps");
    // ddataworks接口地址
    connParam.setEndPoint("dataworks.cn-chengdu.aliyuncs.com");
    final DataWorksOpenApiUtil dataWorksOpenApiUtil = new DataWorksOpenApiUtil(connParam, true);

    // 拉取所有ODPS腳本
    dataWorksOpenApiUtil.listAllFiles(100, "", "", "10", files -> {
        // 處理文件
        for (ListFilesResponse.Data.File file : files) {
            final String fileName = file.getFileName();
            System.out.println(fileName);
        }
    });

    // 拉取所有表的建表語句
    dataWorksOpenApiUtil.listAllDdl((tableName, tableDdlContent) -> {
        System.out.println("=======================================");
        System.out.println("表名:" + tableName + "內(nèi)容如下:\n");
        System.out.println(tableDdlContent);
        System.out.println("=======================================");
    });
}

測試結(jié)果

test_001腳本
test_002腳本
test_003腳本
test_004腳本
test_005腳本
=======================================
表名:test_abc_info內(nèi)容如下:

CREATE TABLE IF NOT EXISTS test_abc_info
 (
    test_abc1        STRING COMMENT '字段1',
    test_abc2        STRING COMMENT '字段2',
    test_abc3        STRING COMMENT '字段3',
    test_abc4        STRING COMMENT '字段4',
    test_abc5        STRING COMMENT '字段5',
    test_abc6        STRING COMMENT '字段6',
    test_abc7        STRING COMMENT '字段7',
    test_abc8        STRING COMMENT '字段8'
 )
PARTITIONED BY (p_date STRING COMMENT '數(shù)據(jù)日期'
)
;
=======================================
Disconnected from the target VM, address: '127.0.0.1:59509', transport: 'socket'

項(xiàng)目地址

https://github.com/HedongLin123/dataworks_odps_demo

到此這篇關(guān)于SpringBoot整合dataworks的實(shí)現(xiàn)過程的文章就介紹到這了,更多相關(guān)SpringBoot整合dataworks內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java追加寫入txt文件的方法總結(jié)

    java追加寫入txt文件的方法總結(jié)

    在本篇文章里我們給大家整理了關(guān)于java如何追加寫入txt文件的方法和代碼,需要的朋友們可以參考下。
    2020-02-02
  • Spring事件監(jiān)聽源碼解析流程分析

    Spring事件監(jiān)聽源碼解析流程分析

    spring事件監(jiān)聽機(jī)制離不開容器IOC特性提供的支持,比如容器會自動創(chuàng)建事件發(fā)布器,自動識別用戶注冊的監(jiān)聽器并進(jìn)行管理,在特定的事件發(fā)布后會找到對應(yīng)的事件監(jiān)聽器并對其監(jiān)聽方法進(jìn)行回調(diào),這篇文章主要介紹了Spring事件監(jiān)聽源碼解析,需要的朋友可以參考下
    2023-08-08
  • springboot實(shí)現(xiàn)極驗(yàn)校驗(yàn)的項(xiàng)目實(shí)踐

    springboot實(shí)現(xiàn)極驗(yàn)校驗(yàn)的項(xiàng)目實(shí)踐

    在系統(tǒng)業(yè)務(wù)中,需要想客戶發(fā)送手機(jī)驗(yàn)證碼,進(jìn)行驗(yàn)證后,才能提交,本文主要介紹了springboot實(shí)現(xiàn)極驗(yàn)校驗(yàn)的項(xiàng)目實(shí)踐,具有一定的參考價值,感興趣的可以了解一下
    2023-09-09
  • java中文分詞之正向最大匹配法實(shí)例代碼

    java中文分詞之正向最大匹配法實(shí)例代碼

    中文分詞應(yīng)用很廣泛,網(wǎng)上也有很多開源項(xiàng)目,下面這篇文章主要給大家介紹了關(guān)于java中文分詞之正向最大匹配法的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-11-11
  • SpringBoot從yml配置文件中讀常用參數(shù)值實(shí)例方法

    SpringBoot從yml配置文件中讀常用參數(shù)值實(shí)例方法

    在本篇文章里小編給大家整理了關(guān)于SpringBoot從yml配置文件中讀常用參數(shù)值實(shí)例方法,有需要的朋友們學(xué)習(xí)下。
    2019-12-12
  • Spring容器刷新obtainFreshBeanFactory示例詳解

    Spring容器刷新obtainFreshBeanFactory示例詳解

    這篇文章主要為大家介紹了Spring容器刷新obtainFreshBeanFactory示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-03-03
  • 分布式Netty源碼分析概覽

    分布式Netty源碼分析概覽

    這篇文章主要為大家介紹了分布式Netty源碼分析概覽,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-03-03
  • Base64加解密的實(shí)現(xiàn)方式實(shí)例詳解

    Base64加解密的實(shí)現(xiàn)方式實(shí)例詳解

    這篇文章主要介紹了Base64加解密的實(shí)現(xiàn)方式實(shí)例詳解的相關(guān)資料,這里提供了實(shí)現(xiàn)實(shí)例,幫助大家學(xué)習(xí)理解這部分內(nèi)容,需要的朋友可以參考下
    2017-08-08
  • java實(shí)現(xiàn)的小時鐘示例分享

    java實(shí)現(xiàn)的小時鐘示例分享

    這篇文章主要介紹了java實(shí)現(xiàn)的小時鐘示例,需要的朋友可以參考下
    2014-02-02
  • Java使用OTP動態(tài)口令(每分鐘變一次)進(jìn)行登錄認(rèn)證

    Java使用OTP動態(tài)口令(每分鐘變一次)進(jìn)行登錄認(rèn)證

    這篇文章主要介紹了Java使用OTP動態(tài)口令(每分鐘變一次)進(jìn)行登錄認(rèn)證,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-09-09

最新評論