PowerJob?AbstractSqlProcessor方法工作流程源碼解讀
序
本文主要研究一下PowerJob的AbstractSqlProcessor
AbstractSqlProcessor
tech/powerjob/official/processors/impl/sql/AbstractSqlProcessor.java
@Slf4j
public abstract class AbstractSqlProcessor extends CommonBasicProcessor {
/**
* 默認(rèn)超時(shí)時(shí)間
*/
protected static final int DEFAULT_TIMEOUT = 60;
/**
* name => SQL validator
* 注意 :
* - 返回 true 表示驗(yàn)證通過
* - 返回 false 表示 SQL 非法,將被拒絕執(zhí)行
*/
protected final Map<String, Predicate<String>> sqlValidatorMap = Maps.newConcurrentMap();
/**
* 自定義 SQL 解析器
*/
protected SqlParser sqlParser;
private static final Joiner JOINER = Joiner.on("|").useForNull("-");
@Override
public ProcessResult process0(TaskContext taskContext) {
OmsLogger omsLogger = taskContext.getOmsLogger();
// 解析參數(shù)
SqlParams sqlParams = extractParams(taskContext);
omsLogger.info("origin sql params: {}", JSON.toJSON(sqlParams));
// 校驗(yàn)參數(shù)
validateParams(sqlParams);
StopWatch stopWatch = new StopWatch(this.getClass().getSimpleName());
// 解析
stopWatch.start("Parse SQL");
if (sqlParser != null) {
omsLogger.info("before parse sql: {}", sqlParams.getSql());
String newSQL = sqlParser.parse(sqlParams.getSql(), taskContext);
sqlParams.setSql(newSQL);
omsLogger.info("after parse sql: {}", newSQL);
}
stopWatch.stop();
// 校驗(yàn) SQL
stopWatch.start("Validate SQL");
validateSql(sqlParams.getSql(), omsLogger);
stopWatch.stop();
// 執(zhí)行
stopWatch.start("Execute SQL");
omsLogger.info("final sql params: {}", JSON.toJSON(sqlParams));
executeSql(sqlParams, taskContext);
stopWatch.stop();
omsLogger.info(stopWatch.prettyPrint());
String message = String.format("execute successfully, used time: %s millisecond", stopWatch.getTotalTimeMillis());
return new ProcessResult(true, message);
}
abstract Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException;
public void setSqlParser(SqlParser sqlParser) {
this.sqlParser = sqlParser;
}
public void registerSqlValidator(String validatorName, Predicate<String> sqlValidator) {
sqlValidatorMap.put(validatorName, sqlValidator);
log.info("register sql validator({})' successfully.", validatorName);
}
//......
}AbstractSqlProcessor繼承了CommonBasicProcessor,其process0先將入?yún)⒔馕鰹镾qlParams,然后調(diào)用validateParams進(jìn)行參數(shù)校驗(yàn),針對(duì)sqlParser不為null的會(huì)通過sqlParser進(jìn)行解析,接著通過validateSql校驗(yàn)sql,最后通過executeSql執(zhí)行sql;它定義了getConnection抽象方法,提供了setSqlParser、registerSqlValidator方法
SqlParams
@Data
public static class SqlParams {
/**
* 數(shù)據(jù)源名稱
*/
private String dataSourceName;
/**
* 需要執(zhí)行的 SQL
*/
private String sql;
/**
* 超時(shí)時(shí)間
*/
private Integer timeout;
/**
* jdbc url
* 具體格式可參考 https://www.baeldung.com/java-jdbc-url-format
*/
private String jdbcUrl;
/**
* 是否展示 SQL 執(zhí)行結(jié)果
*/
private boolean showResult;
}SqlParams定義了dataSourceName、sql、timeout、jdbcUrl、showResult屬性
validateSql
private void validateSql(String sql, OmsLogger omsLogger) {
if (sqlValidatorMap.isEmpty()) {
return;
}
for (Map.Entry<String, Predicate<String>> entry : sqlValidatorMap.entrySet()) {
Predicate<String> validator = entry.getValue();
if (!validator.test(sql)) {
omsLogger.error("validate sql by validator[{}] failed, skip to process!", entry.getKey());
throw new IllegalArgumentException("illegal sql, can't pass the validation of " + entry.getKey());
}
}
}validateSql遍歷sqlValidatorMap,挨個(gè)執(zhí)行test方法,驗(yàn)證不通過拋出IllegalArgumentException
executeSql
@SneakyThrows
private void executeSql(SqlParams sqlParams, TaskContext ctx) {
OmsLogger omsLogger = ctx.getOmsLogger();
boolean originAutoCommitFlag ;
try (Connection connection = getConnection(sqlParams, ctx)) {
originAutoCommitFlag = connection.getAutoCommit();
connection.setAutoCommit(false);
try (Statement statement = connection.createStatement()) {
statement.setQueryTimeout(sqlParams.getTimeout() == null ? DEFAULT_TIMEOUT : sqlParams.getTimeout());
statement.execute(sqlParams.getSql());
connection.commit();
if (sqlParams.showResult) {
outputSqlResult(statement, omsLogger);
}
} catch (Throwable e) {
omsLogger.error("execute sql failed, try to rollback", e);
connection.rollback();
throw e;
} finally {
connection.setAutoCommit(originAutoCommitFlag);
}
}
}executeSql通過getConnection獲取連接,設(shè)置為手動(dòng)提交,然后創(chuàng)建Statement,設(shè)置queryTimeout,執(zhí)行,最后提交,針對(duì)showResult的執(zhí)行outputSqlResult
outputSqlResult
private void outputSqlResult(Statement statement, OmsLogger omsLogger) throws SQLException {
omsLogger.info("====== SQL EXECUTE RESULT ======");
for (int index = 0; index < Integer.MAX_VALUE; index++) {
// 某一個(gè)結(jié)果集
ResultSet resultSet = statement.getResultSet();
if (resultSet != null) {
try (ResultSet rs = resultSet) {
int columnCount = rs.getMetaData().getColumnCount();
List<String> columnNames = Lists.newLinkedList();
//column – the first column is 1, the second is 2, ...
for (int i = 1; i <= columnCount; i++) {
columnNames.add(rs.getMetaData().getColumnName(i));
}
omsLogger.info("[Result-{}] [Columns] {}" + System.lineSeparator(), index, JOINER.join(columnNames));
int rowIndex = 0;
List<Object> row = Lists.newLinkedList();
while (rs.next()) {
for (int i = 1; i <= columnCount; i++) {
row.add(rs.getObject(i));
}
omsLogger.info("[Result-{}] [Row-{}] {}" + System.lineSeparator(), index, rowIndex++, JOINER.join(row));
}
}
} else {
int updateCount = statement.getUpdateCount();
if (updateCount != -1) {
omsLogger.info("[Result-{}] update count: {}", index, updateCount);
}
}
if (((!statement.getMoreResults()) && (statement.getUpdateCount() == -1))) {
break;
}
}
omsLogger.info("====== SQL EXECUTE RESULT ======");
}outputSqlResult從statement獲取resultSet,然后打印columnName,在打印每行數(shù)據(jù),對(duì)于更新操作則打印updateCount
SqlParser
@FunctionalInterface
public interface SqlParser {
/**
* 自定義 SQL 解析邏輯
*
* @param sql 原始 SQL 語句
* @param taskContext 任務(wù)上下文
* @return 解析后的 SQL
*/
String parse(String sql, TaskContext taskContext);
}SqlParser接口定義了parse方法
DynamicDatasourceSqlProcessor
tech/powerjob/official/processors/impl/sql/DynamicDatasourceSqlProcessor.java
public class DynamicDatasourceSqlProcessor extends AbstractSqlProcessor {
@Override
protected void validateParams(SqlParams sqlParams) {
if (StringUtils.isEmpty(sqlParams.getJdbcUrl())) {
throw new IllegalArgumentException("jdbcUrl can't be empty in DynamicDatasourceSqlProcessor!");
}
}
@Override
Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException {
JSONObject params = JSONObject.parseObject(CommonUtils.parseParams(taskContext));
Properties properties = new Properties();
// normally at least a "user" and "password" property should be included
params.forEach((k, v) -> properties.setProperty(k, String.valueOf(v)));
return DriverManager.getConnection(sqlParams.getJdbcUrl(), properties);
}
@Override
protected String getSecurityDKey() {
return SecurityUtils.ENABLE_DYNAMIC_SQL_PROCESSOR;
}
}DynamicDatasourceSqlProcessor繼承了AbstractSqlProcessor,其validateParams要求jdbcUrl不能為空,其getConnection方法會(huì)從taskContext提取properties作為DriverManager.getConnection的屬性,其getSecurityDKey返回的是powerjob.official-processor.dynamic-datasource.enable配置
SpringDatasourceSqlProcessor
tech/powerjob/official/processors/impl/sql/SpringDatasourceSqlProcessor.java
@Slf4j
public class SpringDatasourceSqlProcessor extends AbstractSqlProcessor {
/**
* 默認(rèn)的數(shù)據(jù)源名稱
*/
private static final String DEFAULT_DATASOURCE_NAME = "default";
/**
* name => data source
*/
private final Map<String, DataSource> dataSourceMap;
/**
* 指定默認(rèn)的數(shù)據(jù)源
*
* @param defaultDataSource 默認(rèn)數(shù)據(jù)源
*/
public SpringDatasourceSqlProcessor(DataSource defaultDataSource) {
dataSourceMap = Maps.newConcurrentMap();
registerDataSource(DEFAULT_DATASOURCE_NAME, defaultDataSource);
}
@Override
Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException {
return dataSourceMap.get(sqlParams.getDataSourceName()).getConnection();
}
/**
* 校驗(yàn)參數(shù),如果校驗(yàn)不通過直接拋異常
*
* @param sqlParams SQL 參數(shù)信息
*/
@Override
protected void validateParams(SqlParams sqlParams) {
// 檢查數(shù)據(jù)源
if (StringUtils.isEmpty(sqlParams.getDataSourceName())) {
// use the default data source when current data source name is empty
sqlParams.setDataSourceName(DEFAULT_DATASOURCE_NAME);
}
dataSourceMap.computeIfAbsent(sqlParams.getDataSourceName(), dataSourceName -> {
throw new IllegalArgumentException("can't find data source with name " + dataSourceName);
});
}
/**
* 注冊(cè)數(shù)據(jù)源
*
* @param dataSourceName 數(shù)據(jù)源名稱
* @param dataSource 數(shù)據(jù)源
*/
public void registerDataSource(String dataSourceName, DataSource dataSource) {
Objects.requireNonNull(dataSourceName, "DataSource name must not be null");
Objects.requireNonNull(dataSource, "DataSource must not be null");
dataSourceMap.put(dataSourceName, dataSource);
log.info("register data source({})' successfully.", dataSourceName);
}
/**
* 移除數(shù)據(jù)源
*
* @param dataSourceName 數(shù)據(jù)源名稱
*/
public void removeDataSource(String dataSourceName) {
DataSource remove = dataSourceMap.remove(dataSourceName);
if (remove != null) {
log.warn("remove data source({})' successfully.", dataSourceName);
}
}
}SpringDatasourceSqlProcessor繼承了AbstractSqlProcessor,其構(gòu)造器注冊(cè)名為default的DataSource,其getConnection根據(jù)sqlParams的dataSourceName來獲取連接,validateParams會(huì)先校驗(yàn)指定的dataSource是否存在;它提供了registerDataSource、removeDataSource方法
小結(jié)
AbstractSqlProcessor繼承了CommonBasicProcessor,其process0先將入?yún)⒔馕鰹镾qlParams,然后調(diào)用validateParams進(jìn)行參數(shù)校驗(yàn),針對(duì)sqlParser不為null的會(huì)通過sqlParser進(jìn)行解析,接著通過validateSql校驗(yàn)sql,最后通過executeSql執(zhí)行sql;它定義了getConnection抽象方法,提供了setSqlParser、registerSqlValidator方法。它有兩個(gè)實(shí)現(xiàn)類分別是DynamicDatasourceSqlProcessor(通過jdbcUrl來構(gòu)造連接)、SpringDatasourceSqlProcessor(通過給定的dataSource獲取連接)。
以上就是PowerJob AbstractSqlProcessor方法工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob AbstractSqlProcessor的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java selenium處理極驗(yàn)滑動(dòng)驗(yàn)證碼示例
本篇文章主要介紹了Java selenium處理極驗(yàn)滑動(dòng)驗(yàn)證碼示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-10-10
Spring?MVC?請(qǐng)求映射路徑的配置實(shí)現(xiàn)前后端交互
在Spring?MVC中,請(qǐng)求映射路徑是指與特定的請(qǐng)求處理方法關(guān)聯(lián)的URL路徑,這篇文章主要介紹了Spring?MVC?請(qǐng)求映射路徑的配置,實(shí)現(xiàn)前后端交互,需要的朋友可以參考下2023-09-09
Springboot @Configuration與自動(dòng)配置詳解
這篇文章主要介紹了SpringBoot中的@Configuration自動(dòng)配置,在進(jìn)行項(xiàng)目編寫前,我們還需要知道一個(gè)東西,就是SpringBoot對(duì)我們的SpringMVC還做了哪些配置,包括如何擴(kuò)展,如何定制,只有把這些都搞清楚了,我們?cè)谥笫褂貌艜?huì)更加得心應(yīng)手2022-07-07
Java的接口調(diào)用時(shí)的權(quán)限驗(yàn)證功能的實(shí)現(xiàn)
這篇文章主要介紹了Java的接口調(diào)用時(shí)的權(quán)限驗(yàn)證功能的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11
java 并發(fā)編程之共享變量的實(shí)現(xiàn)方法
這篇文章主要介紹了java 并發(fā)編程之共享變量的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09

