欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PowerJob?AbstractSqlProcessor方法工作流程源碼解讀

 更新時間:2024年01月12日 09:45:25   作者:codecraft  
這篇文章主要為大家介紹了PowerJob?AbstractSqlProcessor方法工作流程源碼解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

本文主要研究一下PowerJob的AbstractSqlProcessor

AbstractSqlProcessor

tech/powerjob/official/processors/impl/sql/AbstractSqlProcessor.java

@Slf4j
public abstract class AbstractSqlProcessor extends CommonBasicProcessor {
    /**
     * 默認(rèn)超時時間
     */
    protected static final int DEFAULT_TIMEOUT = 60;
    /**
     * name => SQL validator
     * 注意 :
     * - 返回 true 表示驗證通過
     * - 返回 false 表示 SQL 非法,將被拒絕執(zhí)行
     */
    protected final Map<String, Predicate<String>> sqlValidatorMap = Maps.newConcurrentMap();
    /**
     * 自定義 SQL 解析器
     */
    protected SqlParser sqlParser;
    private static final Joiner JOINER = Joiner.on("|").useForNull("-");
    @Override
    public ProcessResult process0(TaskContext taskContext) {
        OmsLogger omsLogger = taskContext.getOmsLogger();
        // 解析參數(shù)
        SqlParams sqlParams = extractParams(taskContext);
        omsLogger.info("origin sql params: {}", JSON.toJSON(sqlParams));
        // 校驗參數(shù)
        validateParams(sqlParams);
        StopWatch stopWatch = new StopWatch(this.getClass().getSimpleName());
        // 解析
        stopWatch.start("Parse SQL");
        if (sqlParser != null) {
            omsLogger.info("before parse sql: {}", sqlParams.getSql());
            String newSQL = sqlParser.parse(sqlParams.getSql(), taskContext);
            sqlParams.setSql(newSQL);
            omsLogger.info("after parse sql: {}", newSQL);
        }
        stopWatch.stop();
        // 校驗 SQL
        stopWatch.start("Validate SQL");
        validateSql(sqlParams.getSql(), omsLogger);
        stopWatch.stop();
        // 執(zhí)行
        stopWatch.start("Execute SQL");
        omsLogger.info("final sql params: {}", JSON.toJSON(sqlParams));
        executeSql(sqlParams, taskContext);
        stopWatch.stop();
        omsLogger.info(stopWatch.prettyPrint());
        String message = String.format("execute successfully, used time: %s millisecond", stopWatch.getTotalTimeMillis());
        return new ProcessResult(true, message);
    }
    abstract Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException;
    public void setSqlParser(SqlParser sqlParser) {
        this.sqlParser = sqlParser;
    }
    public void registerSqlValidator(String validatorName, Predicate<String> sqlValidator) {
        sqlValidatorMap.put(validatorName, sqlValidator);
        log.info("register sql validator({})' successfully.", validatorName);
    }
    //......
}
AbstractSqlProcessor繼承了CommonBasicProcessor,其process0先將入?yún)⒔馕鰹镾qlParams,然后調(diào)用validateParams進(jìn)行參數(shù)校驗,針對sqlParser不為null的會通過sqlParser進(jìn)行解析,接著通過validateSql校驗sql,最后通過executeSql執(zhí)行sql;它定義了getConnection抽象方法,提供了setSqlParser、registerSqlValidator方法

SqlParams

@Data
    public static class SqlParams {
        /**
         * 數(shù)據(jù)源名稱
         */
        private String dataSourceName;
        /**
         * 需要執(zhí)行的 SQL
         */
        private String sql;
        /**
         * 超時時間
         */
        private Integer timeout;
        /**
         * jdbc url
         * 具體格式可參考 https://www.baeldung.com/java-jdbc-url-format
         */
        private String jdbcUrl;
        /**
         * 是否展示 SQL 執(zhí)行結(jié)果
         */
        private boolean showResult;
    }
SqlParams定義了dataSourceName、sql、timeout、jdbcUrl、showResult屬性

validateSql

private void validateSql(String sql, OmsLogger omsLogger) {
        if (sqlValidatorMap.isEmpty()) {
            return;
        }
        for (Map.Entry<String, Predicate<String>> entry : sqlValidatorMap.entrySet()) {
            Predicate<String> validator = entry.getValue();
            if (!validator.test(sql)) {
                omsLogger.error("validate sql by validator[{}] failed, skip to process!", entry.getKey());
                throw new IllegalArgumentException("illegal sql, can't pass the validation of " + entry.getKey());
            }
        }
    }
validateSql遍歷sqlValidatorMap,挨個執(zhí)行test方法,驗證不通過拋出IllegalArgumentException

executeSql

@SneakyThrows
    private void executeSql(SqlParams sqlParams, TaskContext ctx) {
        OmsLogger omsLogger = ctx.getOmsLogger();
        boolean originAutoCommitFlag ;
        try (Connection connection = getConnection(sqlParams, ctx)) {
            originAutoCommitFlag = connection.getAutoCommit();
            connection.setAutoCommit(false);
            try (Statement statement = connection.createStatement()) {
                statement.setQueryTimeout(sqlParams.getTimeout() == null ? DEFAULT_TIMEOUT : sqlParams.getTimeout());
                statement.execute(sqlParams.getSql());
                connection.commit();
                if (sqlParams.showResult) {
                    outputSqlResult(statement, omsLogger);
                }
            } catch (Throwable e) {
                omsLogger.error("execute sql failed, try to rollback", e);
                connection.rollback();
                throw e;
            } finally {
                connection.setAutoCommit(originAutoCommitFlag);
            }
        }
    }
executeSql通過getConnection獲取連接,設(shè)置為手動提交,然后創(chuàng)建Statement,設(shè)置queryTimeout,執(zhí)行,最后提交,針對showResult的執(zhí)行outputSqlResult

outputSqlResult

private void outputSqlResult(Statement statement, OmsLogger omsLogger) throws SQLException {
        omsLogger.info("====== SQL EXECUTE RESULT ======");

        for (int index = 0; index < Integer.MAX_VALUE; index++) {

            // 某一個結(jié)果集
            ResultSet resultSet = statement.getResultSet();
            if (resultSet != null) {
                try (ResultSet rs = resultSet) {
                    int columnCount = rs.getMetaData().getColumnCount();
                    List<String> columnNames = Lists.newLinkedList();
                    //column – the first column is 1, the second is 2, ...
                    for (int i = 1; i <= columnCount; i++) {
                        columnNames.add(rs.getMetaData().getColumnName(i));
                    }
                    omsLogger.info("[Result-{}] [Columns] {}" + System.lineSeparator(), index, JOINER.join(columnNames));
                    int rowIndex = 0;
                    List<Object> row = Lists.newLinkedList();
                    while (rs.next()) {
                        for (int i = 1; i <= columnCount; i++) {
                            row.add(rs.getObject(i));
                        }
                        omsLogger.info("[Result-{}] [Row-{}] {}" + System.lineSeparator(), index, rowIndex++, JOINER.join(row));
                    }
                }
            } else {
                int updateCount = statement.getUpdateCount();
                if (updateCount != -1) {
                    omsLogger.info("[Result-{}] update count: {}", index, updateCount);
                }
            }
            if (((!statement.getMoreResults()) && (statement.getUpdateCount() == -1))) {
                break;
            }
        }
        omsLogger.info("====== SQL EXECUTE RESULT ======");
    }
outputSqlResult從statement獲取resultSet,然后打印columnName,在打印每行數(shù)據(jù),對于更新操作則打印updateCount

SqlParser

@FunctionalInterface
    public interface SqlParser {
        /**
         * 自定義 SQL 解析邏輯
         *
         * @param sql         原始 SQL 語句
         * @param taskContext 任務(wù)上下文
         * @return 解析后的 SQL
         */
        String parse(String sql, TaskContext taskContext);
    }
SqlParser接口定義了parse方法

DynamicDatasourceSqlProcessor

tech/powerjob/official/processors/impl/sql/DynamicDatasourceSqlProcessor.java

public class DynamicDatasourceSqlProcessor extends AbstractSqlProcessor {
    @Override
    protected void validateParams(SqlParams sqlParams) {
        if (StringUtils.isEmpty(sqlParams.getJdbcUrl())) {
            throw new IllegalArgumentException("jdbcUrl can't be empty in DynamicDatasourceSqlProcessor!");
        }
    }
    @Override
    Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException {
        JSONObject params = JSONObject.parseObject(CommonUtils.parseParams(taskContext));
        Properties properties = new Properties();
        // normally at least a "user" and "password" property should be included
        params.forEach((k, v) -> properties.setProperty(k, String.valueOf(v)));
        return DriverManager.getConnection(sqlParams.getJdbcUrl(), properties);
    }
    @Override
    protected String getSecurityDKey() {
        return SecurityUtils.ENABLE_DYNAMIC_SQL_PROCESSOR;
    }
}
DynamicDatasourceSqlProcessor繼承了AbstractSqlProcessor,其validateParams要求jdbcUrl不能為空,其getConnection方法會從taskContext提取properties作為DriverManager.getConnection的屬性,其getSecurityDKey返回的是powerjob.official-processor.dynamic-datasource.enable配置

SpringDatasourceSqlProcessor

tech/powerjob/official/processors/impl/sql/SpringDatasourceSqlProcessor.java

@Slf4j
public class SpringDatasourceSqlProcessor extends AbstractSqlProcessor {
    /**
     * 默認(rèn)的數(shù)據(jù)源名稱
     */
    private static final String DEFAULT_DATASOURCE_NAME = "default";
    /**
     * name => data source
     */
    private final Map<String, DataSource> dataSourceMap;
    /**
     * 指定默認(rèn)的數(shù)據(jù)源
     *
     * @param defaultDataSource 默認(rèn)數(shù)據(jù)源
     */
    public SpringDatasourceSqlProcessor(DataSource defaultDataSource) {
        dataSourceMap = Maps.newConcurrentMap();
        registerDataSource(DEFAULT_DATASOURCE_NAME, defaultDataSource);
    }
    @Override
    Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException {
        return dataSourceMap.get(sqlParams.getDataSourceName()).getConnection();
    }
    /**
     * 校驗參數(shù),如果校驗不通過直接拋異常
     *
     * @param sqlParams SQL 參數(shù)信息
     */
    @Override
    protected void validateParams(SqlParams sqlParams) {
        // 檢查數(shù)據(jù)源
        if (StringUtils.isEmpty(sqlParams.getDataSourceName())) {
            // use the default data source when current data source name is empty
            sqlParams.setDataSourceName(DEFAULT_DATASOURCE_NAME);
        }
        dataSourceMap.computeIfAbsent(sqlParams.getDataSourceName(), dataSourceName -> {
            throw new IllegalArgumentException("can't find data source with name " + dataSourceName);
        });
    }
    /**
     * 注冊數(shù)據(jù)源
     *
     * @param dataSourceName 數(shù)據(jù)源名稱
     * @param dataSource     數(shù)據(jù)源
     */
    public void registerDataSource(String dataSourceName, DataSource dataSource) {
        Objects.requireNonNull(dataSourceName, "DataSource name must not be null");
        Objects.requireNonNull(dataSource, "DataSource must not be null");
        dataSourceMap.put(dataSourceName, dataSource);
        log.info("register data source({})' successfully.", dataSourceName);
    }
    /**
     * 移除數(shù)據(jù)源
     *
     * @param dataSourceName 數(shù)據(jù)源名稱
     */
    public void removeDataSource(String dataSourceName) {
        DataSource remove = dataSourceMap.remove(dataSourceName);
        if (remove != null) {
            log.warn("remove data source({})' successfully.", dataSourceName);
        }
    }
}
SpringDatasourceSqlProcessor繼承了AbstractSqlProcessor,其構(gòu)造器注冊名為default的DataSource,其getConnection根據(jù)sqlParams的dataSourceName來獲取連接,validateParams會先校驗指定的dataSource是否存在;它提供了registerDataSource、removeDataSource方法

小結(jié)

AbstractSqlProcessor繼承了CommonBasicProcessor,其process0先將入?yún)⒔馕鰹镾qlParams,然后調(diào)用validateParams進(jìn)行參數(shù)校驗,針對sqlParser不為null的會通過sqlParser進(jìn)行解析,接著通過validateSql校驗sql,最后通過executeSql執(zhí)行sql;它定義了getConnection抽象方法,提供了setSqlParser、registerSqlValidator方法。它有兩個實(shí)現(xiàn)類分別是DynamicDatasourceSqlProcessor(通過jdbcUrl來構(gòu)造連接)、SpringDatasourceSqlProcessor(通過給定的dataSource獲取連接)。

以上就是PowerJob AbstractSqlProcessor方法工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob AbstractSqlProcessor的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java selenium處理極驗滑動驗證碼示例

    Java selenium處理極驗滑動驗證碼示例

    本篇文章主要介紹了Java selenium處理極驗滑動驗證碼示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-10-10
  • Spring?MVC?請求映射路徑的配置實(shí)現(xiàn)前后端交互

    Spring?MVC?請求映射路徑的配置實(shí)現(xiàn)前后端交互

    在Spring?MVC中,請求映射路徑是指與特定的請求處理方法關(guān)聯(lián)的URL路徑,這篇文章主要介紹了Spring?MVC?請求映射路徑的配置,實(shí)現(xiàn)前后端交互,需要的朋友可以參考下
    2023-09-09
  • Springboot @Configuration與自動配置詳解

    Springboot @Configuration與自動配置詳解

    這篇文章主要介紹了SpringBoot中的@Configuration自動配置,在進(jìn)行項目編寫前,我們還需要知道一個東西,就是SpringBoot對我們的SpringMVC還做了哪些配置,包括如何擴(kuò)展,如何定制,只有把這些都搞清楚了,我們在之后使用才會更加得心應(yīng)手
    2022-07-07
  • Java基礎(chǔ)教程之封裝與接口

    Java基礎(chǔ)教程之封裝與接口

    這篇文章主要介紹了Java基礎(chǔ)教程之封裝與接口,本文用淺顯易懂的語言講解了Java中的封裝與接口,很形象的說明了這兩個面向?qū)ο笮g(shù)語,需要的朋友可以參考下
    2014-08-08
  • Java的接口調(diào)用時的權(quán)限驗證功能的實(shí)現(xiàn)

    Java的接口調(diào)用時的權(quán)限驗證功能的實(shí)現(xiàn)

    這篇文章主要介紹了Java的接口調(diào)用時的權(quán)限驗證功能的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-11-11
  • 如何在JAVA中使用Synchronized

    如何在JAVA中使用Synchronized

    這篇文章主要介紹了如何在JAVA中使用Synchronized,文中代碼非常詳細(xì),對大家的學(xué)習(xí)有所幫助,感興趣的朋友可以參考下
    2020-06-06
  • java 并發(fā)編程之共享變量的實(shí)現(xiàn)方法

    java 并發(fā)編程之共享變量的實(shí)現(xiàn)方法

    這篇文章主要介紹了java 并發(fā)編程之共享變量的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-09-09
  • Spring Boot使用Allatori代碼混淆的方法

    Spring Boot使用Allatori代碼混淆的方法

    這篇文章主要介紹了Spring Boot使用Allatori代碼混淆的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-03-03
  • 詳解JAVA 反射機(jī)制

    詳解JAVA 反射機(jī)制

    這篇文章主要介紹了JAVA 反射機(jī)制的相關(guān)知識,文中講解的非常細(xì)致,代碼幫助大家更好的理解學(xué)習(xí),感興趣的朋友可以了解下
    2020-06-06
  • 淺談Java引用和Threadlocal的那些事

    淺談Java引用和Threadlocal的那些事

    這篇文章主要介紹了Java引用和Threadlocal的那些事,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-03-03

最新評論