PowerJob?AbstractSqlProcessor方法工作流程源碼解讀
序
本文主要研究一下PowerJob的AbstractSqlProcessor
AbstractSqlProcessor
tech/powerjob/official/processors/impl/sql/AbstractSqlProcessor.java
@Slf4j public abstract class AbstractSqlProcessor extends CommonBasicProcessor { /** * 默認(rèn)超時時間 */ protected static final int DEFAULT_TIMEOUT = 60; /** * name => SQL validator * 注意 : * - 返回 true 表示驗證通過 * - 返回 false 表示 SQL 非法,將被拒絕執(zhí)行 */ protected final Map<String, Predicate<String>> sqlValidatorMap = Maps.newConcurrentMap(); /** * 自定義 SQL 解析器 */ protected SqlParser sqlParser; private static final Joiner JOINER = Joiner.on("|").useForNull("-"); @Override public ProcessResult process0(TaskContext taskContext) { OmsLogger omsLogger = taskContext.getOmsLogger(); // 解析參數(shù) SqlParams sqlParams = extractParams(taskContext); omsLogger.info("origin sql params: {}", JSON.toJSON(sqlParams)); // 校驗參數(shù) validateParams(sqlParams); StopWatch stopWatch = new StopWatch(this.getClass().getSimpleName()); // 解析 stopWatch.start("Parse SQL"); if (sqlParser != null) { omsLogger.info("before parse sql: {}", sqlParams.getSql()); String newSQL = sqlParser.parse(sqlParams.getSql(), taskContext); sqlParams.setSql(newSQL); omsLogger.info("after parse sql: {}", newSQL); } stopWatch.stop(); // 校驗 SQL stopWatch.start("Validate SQL"); validateSql(sqlParams.getSql(), omsLogger); stopWatch.stop(); // 執(zhí)行 stopWatch.start("Execute SQL"); omsLogger.info("final sql params: {}", JSON.toJSON(sqlParams)); executeSql(sqlParams, taskContext); stopWatch.stop(); omsLogger.info(stopWatch.prettyPrint()); String message = String.format("execute successfully, used time: %s millisecond", stopWatch.getTotalTimeMillis()); return new ProcessResult(true, message); } abstract Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException; public void setSqlParser(SqlParser sqlParser) { this.sqlParser = sqlParser; } public void registerSqlValidator(String validatorName, Predicate<String> sqlValidator) { sqlValidatorMap.put(validatorName, sqlValidator); log.info("register sql validator({})' successfully.", validatorName); } //...... }
AbstractSqlProcessor繼承了CommonBasicProcessor,其process0先將入?yún)⒔馕鰹镾qlParams,然后調(diào)用validateParams進(jìn)行參數(shù)校驗,針對sqlParser不為null的會通過sqlParser進(jìn)行解析,接著通過validateSql校驗sql,最后通過executeSql執(zhí)行sql;它定義了getConnection抽象方法,提供了setSqlParser、registerSqlValidator方法
SqlParams
@Data public static class SqlParams { /** * 數(shù)據(jù)源名稱 */ private String dataSourceName; /** * 需要執(zhí)行的 SQL */ private String sql; /** * 超時時間 */ private Integer timeout; /** * jdbc url * 具體格式可參考 https://www.baeldung.com/java-jdbc-url-format */ private String jdbcUrl; /** * 是否展示 SQL 執(zhí)行結(jié)果 */ private boolean showResult; }
SqlParams定義了dataSourceName、sql、timeout、jdbcUrl、showResult屬性
validateSql
private void validateSql(String sql, OmsLogger omsLogger) { if (sqlValidatorMap.isEmpty()) { return; } for (Map.Entry<String, Predicate<String>> entry : sqlValidatorMap.entrySet()) { Predicate<String> validator = entry.getValue(); if (!validator.test(sql)) { omsLogger.error("validate sql by validator[{}] failed, skip to process!", entry.getKey()); throw new IllegalArgumentException("illegal sql, can't pass the validation of " + entry.getKey()); } } }
validateSql遍歷sqlValidatorMap,挨個執(zhí)行test方法,驗證不通過拋出IllegalArgumentException
executeSql
@SneakyThrows private void executeSql(SqlParams sqlParams, TaskContext ctx) { OmsLogger omsLogger = ctx.getOmsLogger(); boolean originAutoCommitFlag ; try (Connection connection = getConnection(sqlParams, ctx)) { originAutoCommitFlag = connection.getAutoCommit(); connection.setAutoCommit(false); try (Statement statement = connection.createStatement()) { statement.setQueryTimeout(sqlParams.getTimeout() == null ? DEFAULT_TIMEOUT : sqlParams.getTimeout()); statement.execute(sqlParams.getSql()); connection.commit(); if (sqlParams.showResult) { outputSqlResult(statement, omsLogger); } } catch (Throwable e) { omsLogger.error("execute sql failed, try to rollback", e); connection.rollback(); throw e; } finally { connection.setAutoCommit(originAutoCommitFlag); } } }
executeSql通過getConnection獲取連接,設(shè)置為手動提交,然后創(chuàng)建Statement,設(shè)置queryTimeout,執(zhí)行,最后提交,針對showResult的執(zhí)行outputSqlResult
outputSqlResult
private void outputSqlResult(Statement statement, OmsLogger omsLogger) throws SQLException { omsLogger.info("====== SQL EXECUTE RESULT ======"); for (int index = 0; index < Integer.MAX_VALUE; index++) { // 某一個結(jié)果集 ResultSet resultSet = statement.getResultSet(); if (resultSet != null) { try (ResultSet rs = resultSet) { int columnCount = rs.getMetaData().getColumnCount(); List<String> columnNames = Lists.newLinkedList(); //column – the first column is 1, the second is 2, ... for (int i = 1; i <= columnCount; i++) { columnNames.add(rs.getMetaData().getColumnName(i)); } omsLogger.info("[Result-{}] [Columns] {}" + System.lineSeparator(), index, JOINER.join(columnNames)); int rowIndex = 0; List<Object> row = Lists.newLinkedList(); while (rs.next()) { for (int i = 1; i <= columnCount; i++) { row.add(rs.getObject(i)); } omsLogger.info("[Result-{}] [Row-{}] {}" + System.lineSeparator(), index, rowIndex++, JOINER.join(row)); } } } else { int updateCount = statement.getUpdateCount(); if (updateCount != -1) { omsLogger.info("[Result-{}] update count: {}", index, updateCount); } } if (((!statement.getMoreResults()) && (statement.getUpdateCount() == -1))) { break; } } omsLogger.info("====== SQL EXECUTE RESULT ======"); }
outputSqlResult從statement獲取resultSet,然后打印columnName,在打印每行數(shù)據(jù),對于更新操作則打印updateCount
SqlParser
@FunctionalInterface public interface SqlParser { /** * 自定義 SQL 解析邏輯 * * @param sql 原始 SQL 語句 * @param taskContext 任務(wù)上下文 * @return 解析后的 SQL */ String parse(String sql, TaskContext taskContext); }
SqlParser接口定義了parse方法
DynamicDatasourceSqlProcessor
tech/powerjob/official/processors/impl/sql/DynamicDatasourceSqlProcessor.java
public class DynamicDatasourceSqlProcessor extends AbstractSqlProcessor { @Override protected void validateParams(SqlParams sqlParams) { if (StringUtils.isEmpty(sqlParams.getJdbcUrl())) { throw new IllegalArgumentException("jdbcUrl can't be empty in DynamicDatasourceSqlProcessor!"); } } @Override Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException { JSONObject params = JSONObject.parseObject(CommonUtils.parseParams(taskContext)); Properties properties = new Properties(); // normally at least a "user" and "password" property should be included params.forEach((k, v) -> properties.setProperty(k, String.valueOf(v))); return DriverManager.getConnection(sqlParams.getJdbcUrl(), properties); } @Override protected String getSecurityDKey() { return SecurityUtils.ENABLE_DYNAMIC_SQL_PROCESSOR; } }
DynamicDatasourceSqlProcessor繼承了AbstractSqlProcessor,其validateParams要求jdbcUrl不能為空,其getConnection方法會從taskContext提取properties作為DriverManager.getConnection的屬性,其getSecurityDKey返回的是powerjob.official-processor.dynamic-datasource.enable配置
SpringDatasourceSqlProcessor
tech/powerjob/official/processors/impl/sql/SpringDatasourceSqlProcessor.java
@Slf4j public class SpringDatasourceSqlProcessor extends AbstractSqlProcessor { /** * 默認(rèn)的數(shù)據(jù)源名稱 */ private static final String DEFAULT_DATASOURCE_NAME = "default"; /** * name => data source */ private final Map<String, DataSource> dataSourceMap; /** * 指定默認(rèn)的數(shù)據(jù)源 * * @param defaultDataSource 默認(rèn)數(shù)據(jù)源 */ public SpringDatasourceSqlProcessor(DataSource defaultDataSource) { dataSourceMap = Maps.newConcurrentMap(); registerDataSource(DEFAULT_DATASOURCE_NAME, defaultDataSource); } @Override Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException { return dataSourceMap.get(sqlParams.getDataSourceName()).getConnection(); } /** * 校驗參數(shù),如果校驗不通過直接拋異常 * * @param sqlParams SQL 參數(shù)信息 */ @Override protected void validateParams(SqlParams sqlParams) { // 檢查數(shù)據(jù)源 if (StringUtils.isEmpty(sqlParams.getDataSourceName())) { // use the default data source when current data source name is empty sqlParams.setDataSourceName(DEFAULT_DATASOURCE_NAME); } dataSourceMap.computeIfAbsent(sqlParams.getDataSourceName(), dataSourceName -> { throw new IllegalArgumentException("can't find data source with name " + dataSourceName); }); } /** * 注冊數(shù)據(jù)源 * * @param dataSourceName 數(shù)據(jù)源名稱 * @param dataSource 數(shù)據(jù)源 */ public void registerDataSource(String dataSourceName, DataSource dataSource) { Objects.requireNonNull(dataSourceName, "DataSource name must not be null"); Objects.requireNonNull(dataSource, "DataSource must not be null"); dataSourceMap.put(dataSourceName, dataSource); log.info("register data source({})' successfully.", dataSourceName); } /** * 移除數(shù)據(jù)源 * * @param dataSourceName 數(shù)據(jù)源名稱 */ public void removeDataSource(String dataSourceName) { DataSource remove = dataSourceMap.remove(dataSourceName); if (remove != null) { log.warn("remove data source({})' successfully.", dataSourceName); } } }
SpringDatasourceSqlProcessor繼承了AbstractSqlProcessor,其構(gòu)造器注冊名為default的DataSource,其getConnection根據(jù)sqlParams的dataSourceName來獲取連接,validateParams會先校驗指定的dataSource是否存在;它提供了registerDataSource、removeDataSource方法
小結(jié)
AbstractSqlProcessor繼承了CommonBasicProcessor,其process0先將入?yún)⒔馕鰹镾qlParams,然后調(diào)用validateParams進(jìn)行參數(shù)校驗,針對sqlParser不為null的會通過sqlParser進(jìn)行解析,接著通過validateSql校驗sql,最后通過executeSql執(zhí)行sql;它定義了getConnection抽象方法,提供了setSqlParser、registerSqlValidator方法。它有兩個實(shí)現(xiàn)類分別是DynamicDatasourceSqlProcessor(通過jdbcUrl來構(gòu)造連接)、SpringDatasourceSqlProcessor(通過給定的dataSource獲取連接)。
以上就是PowerJob AbstractSqlProcessor方法工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob AbstractSqlProcessor的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring?MVC?請求映射路徑的配置實(shí)現(xiàn)前后端交互
在Spring?MVC中,請求映射路徑是指與特定的請求處理方法關(guān)聯(lián)的URL路徑,這篇文章主要介紹了Spring?MVC?請求映射路徑的配置,實(shí)現(xiàn)前后端交互,需要的朋友可以參考下2023-09-09Springboot @Configuration與自動配置詳解
這篇文章主要介紹了SpringBoot中的@Configuration自動配置,在進(jìn)行項目編寫前,我們還需要知道一個東西,就是SpringBoot對我們的SpringMVC還做了哪些配置,包括如何擴(kuò)展,如何定制,只有把這些都搞清楚了,我們在之后使用才會更加得心應(yīng)手2022-07-07Java的接口調(diào)用時的權(quán)限驗證功能的實(shí)現(xiàn)
這篇文章主要介紹了Java的接口調(diào)用時的權(quán)限驗證功能的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11java 并發(fā)編程之共享變量的實(shí)現(xiàn)方法
這篇文章主要介紹了java 并發(fā)編程之共享變量的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09