SpringBatch結合SpringBoot簡單使用實現(xiàn)工資發(fā)放批處理操作方式
最近有接觸到批處理相關的需求,學習了下SpringBatch的使用方法。SpringBatch能把復雜的批處理任務進行step分解,并能通過reader和writer滿足不同來源數(shù)據(jù)的處理需求,支持在step定義時設置異常重試策略等,比較方便拓展。
簡單記錄下基于SpringBoot寫的使用demo。
需求
兩張表,user_with_role和role_num,分別有user信息和工資流水信息,role_num冗余。
user_with_role表,包含員工信息和role字段信息
CREATE TABLE `user_with_role` ( `id` INT NOT NULL AUTO_INCREMENT, `username` VARCHAR(45) NULL, `role` VARCHAR(45) NULL, PRIMARY KEY (`id`));
role_num表,包含發(fā)錢信息
CREATE TABLE `role_num1` ( `id` INT NOT NULL AUTO_INCREMENT, `role` VARCHAR(45) NULL, `account` INT NULL, `username` VARCHAR(45) NULL, `inputtime` VARCHAR(45) NULL, PRIMARY KEY (`id`));
pom設置和application配置文件
在springboot的基礎上使用batch,pom.xml中增加dependency
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>還需要加上jdbc依賴,不過這個一般都有加。
application中增加:
spring:
batch:
initialize-schema: always這個配置主要是讓springbatch自動在數(shù)據(jù)庫中創(chuàng)建運行所需的表,SpringBoot2.7版本后修改為 spring.batch.jdbc.initialize-schema 了,不過我還在用老土2.3,先這樣吧。
bean類
UserWithRole
@Data
public class UserWithRole {
private int id;
private String userName;
private String role;
}RoleNum
@Data
public class RoleNum {
private int id;
private String role;
private int account;
private String userName;
private String inputTime;
}關鍵job配置類
包含reader,writer,step和整體job的配置,通過@Bean注解的方法來進行spring管理,下面一步步來。
@Configuration
@EnableBatchProcessing
// batch job設置
// 將user_with_role中的role_user工資信息,輸出到role_num表中
public class RoleCountingBatchJobConfig {
//注入任務對象工廠
@Autowired
private JobBuilderFactory jobBuilderFactory;
//任務的執(zhí)行由Step決定,注入step對象的factory
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Autowired
private NamedParameterJdbcTemplate namedParameterJdbcTemplate;SpringBatch默認會構建兩個Factory用來進行job構建,DataSource默認為Spring配置的數(shù)據(jù)庫,NamedParameterJdbcTemplate由Spring提供,用來將statement中的問號占位符改為利用命名參數(shù)映射更方便,不過我后面沒用。
// 設置itemReader
@Bean
public JdbcCursorItemReader<UserWithRole> getItemReader(){
JdbcCursorItemReader<UserWithRole> itemReader = new JdbcCursorItemReader<>();
itemReader.setDataSource(dataSource);//設置數(shù)據(jù)源
// 實體映射
itemReader.setRowMapper(new RowMapper<UserWithRole>() {
@Override
public UserWithRole mapRow(ResultSet rs, int rowNum) throws SQLException {
UserWithRole uwr = new UserWithRole();
uwr.setId(rs.getInt("id"));
uwr.setUserName(rs.getString("userName"));
uwr.setRole(rs.getString("role"));
return uwr;
}
});
String sql = "select u.id as id, u.username as userName, u.role as role from user_with_role as u";
itemReader.setSql(sql);
return itemReader;
}itemReader 用的是 JdbcCursorItemReader ,用來從user_with_role表中讀取數(shù)據(jù),主要是配置 RowMapper ,將行數(shù)據(jù)封裝成 UserWithRole 對象,sql就是簡單查詢。
// 設置itemWriter
@Bean
public JdbcBatchItemWriter<RoleNum> getItemWriter(){
JdbcBatchItemWriter<RoleNum> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(dataSource);
itemWriter.setJdbcTemplate(namedParameterJdbcTemplate);
String sql = "insert into role_num(role, account, username, inputtime) values(?, ?, ?, ?)";
itemWriter.setSql(sql);
itemWriter.setItemPreparedStatementSetter(new RoleNumPreparedStatementSetter());
return itemWriter;
}itemWriter 用的是 JdbcBatchItemWriter ,用來把RoleNum對象存到數(shù)據(jù)庫中,主要的參數(shù)寫入邏輯寫在 RoleNumPreparedStatementSetter 類中。
public class RoleNumPreparedStatementSetter implements ItemPreparedStatementSetter<RoleNum> {
// 將role_num的批處理計算結果存入的sql
@Override
public void setValues(RoleNum roleNum, PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setString(1, roleNum.getRole());
preparedStatement.setInt(2, roleNum.getAccount());
preparedStatement.setString(3, roleNum.getUserName());
preparedStatement.setString(4, roleNum.getInputTime());
}
}RoleNumPreparedStatementSetter 主要是將參數(shù)通過占位符放到sql中。
// 設置process
@Bean
public RoleUserCountingProcess getProcess(){
return new RoleUserCountingProcess();
}public class RoleUserCountingProcess implements ItemProcessor<UserWithRole, RoleNum> {
// 將user的信息轉換成roleNum信息,當作發(fā)工資
@Override
public RoleNum process(UserWithRole userWithRole) throws Exception {
RoleNum newRoleNum = new RoleNum();
newRoleNum.setUserName(userWithRole.getUserName());
newRoleNum.setRole(userWithRole.getRole());
newRoleNum.setInputTime(new LocalDateTime(new Date()).toString());
if(userWithRole.getRole()==null)
return newRoleNum;
switch (userWithRole.getRole()) {
case "employee":
newRoleNum.setAccount(100);
break;
case "manager":
newRoleNum.setAccount(10000);
break;
case "boss":
newRoleNum.setAccount(100000);
break;
}
System.out.println(newRoleNum.getUserName() + "---" + newRoleNum.getAccount());
return newRoleNum;
}
}process中定義了具體的工資寫入邏輯,實現(xiàn)了 ItemProcess 接口的process方法,將itemReader的輸出UserWithRole類和itemWriter的輸入RoleNum類進行連接,從而完成process過程。
// 設置step
@Bean
public Step getStep(){
return stepBuilderFactory.get("user_role_convert_step")
.<UserWithRole, RoleNum>chunk(10)
.reader(getItemReader())
.processor(getProcess())
.writer(getItemWriter())
.build();
}
// 獲取job對象
@Bean
public Job RoleCountingBatchJob(JobBuilderFactory jobBuilders,
StepBuilderFactory stepBuilders){
return jobBuilders.get("user_role_convert_step")
.start(getStep())
.build();
}最后定義step和job整體流程。
在配置文件中加入:
spring:
batch:
job:
enabled: false可以阻止job在項目啟動時自動執(zhí)行。
controller啟動jobInstance
@Controller
public class BatchRunnerController {
@Autowired
JobLauncher jobLauncher;
// 自定義job任務
@Autowired
Job roleCountingBatchJob;
@RequestMapping("/countingRoleUser")
@ResponseBody
public String countingRollUser() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
// 設置時間parameter來區(qū)分instance
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addDate("startDate", new Date());
jobLauncher.run(roleCountingBatchJob, jobParametersBuilder.toJobParameters());
return "role user counting batch success --- " +
Objects.requireNonNull(jobParametersBuilder.toJobParameters().getDate("startDate")).toString();
}
}利用 jobLauncher 來執(zhí)行對應的job,date時間來區(qū)分不同的jobInstance。
實現(xiàn)效果

user_with_role表

批處理job正常執(zhí)行

duideduide沒有role所以沒發(fā)到工資
總結
本文簡單記錄了下簡單的springbatch的使用,包括item相關,process編寫,step構建和最終的job使用。springbatch還有一點是可以設置出現(xiàn)異常的處理策略,比如容忍數(shù)次異常,調過某些異常等,在真實使用中比較靈活,有機會再補充。
好了,以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
你必須得會的SpringBoot全局統(tǒng)一處理異常詳解
程序在運行的過程中,不可避免會產(chǎn)生各種各樣的錯誤,這個時候就需要進行異常處理,本文主要為大家介紹了SpringBoot實現(xiàn)全局統(tǒng)一處理異常的方法,需要的可以參考一下2023-06-06
Mybatis實現(xiàn)關聯(lián)關系映射的方法示例
本文主要介紹了Mybatis實現(xiàn)關聯(lián)關系映射的方法示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-07-07
Java abstract class 與 interface對比
這篇文章主要介紹了 Java abstract class 與 interface對比的相關資料,需要的朋友可以參考下2016-12-12
Mybatis plus關閉駝峰命名的四種方法(防止出現(xiàn)查詢?yōu)镹ull)
這篇文章主要介紹了Mybatis plus關閉駝峰命名的四種方法(防止出現(xiàn)查詢?yōu)镹ull),數(shù)據(jù)庫的字段命名方式為使用下劃線連接,對應的實體類應該是駝峰命名方式,而我使用的是和數(shù)據(jù)庫同樣的命名方式,需要的朋友可以參考下2022-01-01
MyBatis動態(tài)SQL之<choose><when><otherwise>標簽的使用
MyBatis中動態(tài)語句choose-when-otherwise 類似于Java中的switch-case-default語句,本文就來介紹一下MyBatis動態(tài)SQL之<choose><when><otherwise>標簽的使用,感興趣的可以了解一下2023-09-09
解決IDEA創(chuàng)建第一個spring boot項目提示cannot resolve xxx等
這篇文章主要介紹了解決IDEA創(chuàng)建第一個spring boot項目提示cannot resolve xxx等錯誤問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01

