springbatch的封裝與使用實例詳解
Spring Batch官網(wǎng)介紹:
A lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.(一款輕量的、全面的批處理框架,用于開發(fā)強大的日常運營的企業(yè)級批處理應(yīng)用程序。)
springbatch
主要實現(xiàn)批量數(shù)據(jù)的處理,我對batch進行的封裝,提出了jobBase類型,具體job需要實現(xiàn)它即可。Spring Batch 不僅提供了統(tǒng)一的讀寫接口、豐富的任務(wù)處理方式、靈活的事務(wù)管理及并發(fā)處理,同時還支持日志、監(jiān)控、任務(wù)重啟與跳過等特性,大大簡化了批處理應(yīng)用開發(fā),將開發(fā)人員從復(fù)雜的任務(wù)配置管理過程中解放出來,使他們可以更多地去關(guān)注核心的業(yè)務(wù)處理過程。
幾個組件
•job
•step
•read
•write
•listener
•process
•validator
JobBase定義了幾個公用的方法
/**
* springBatch的job基礎(chǔ)類.
*/
public abstract class JobBase<T> {
/**
* 批次.
*/
protected int chunkCount = 5000;
/**
* 監(jiān)聽器.
*/
private JobExecutionListener jobExecutionListener;
/**
* 處理器.
*/
private ValidatingItemProcessor<T> validatingItemProcessor;
/**
* job名稱.
*/
private String jobName;
/**
* 檢驗器.
*/
private Validator<T> validator;
@Autowired
private JobBuilderFactory job;
@Autowired
private StepBuilderFactory step;
/**
* 初始化.
*
* @param jobName job名稱
* @param jobExecutionListener 監(jiān)聽器
* @param validatingItemProcessor 處理器
* @param validator 檢驗
*/
public JobBase(String jobName,
JobExecutionListener jobExecutionListener,
ValidatingItemProcessor<T> validatingItemProcessor,
Validator<T> validator) {
this.jobName = jobName;
this.jobExecutionListener = jobExecutionListener;
this.validatingItemProcessor = validatingItemProcessor;
this.validator = validator;
}
/**
* job初始化與啟動.
*/
public Job getJob() throws Exception {
return job.get(jobName).incrementer(new RunIdIncrementer())
.start(syncStep())
.listener(jobExecutionListener)
.build();
}
/**
* 執(zhí)行步驟.
*
* @return
*/
public Step syncStep() throws Exception {
return step.get("step1")
.<T, T>chunk(chunkCount)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
/**
* 單條處理數(shù)據(jù).
*
* @return
*/
public ItemProcessor<T, T> processor() {
validatingItemProcessor.setValidator(processorValidator());
return validatingItemProcessor;
}
/**
* 校驗數(shù)據(jù).
*
* @return
*/
@Bean
public Validator<T> processorValidator() {
return validator;
}
/**
* 批量讀數(shù)據(jù).
*
* @return
* @throws Exception
*/
public abstract ItemReader<T> reader() throws Exception;
/**
* 批量寫數(shù)據(jù).
*
* @return
*/
@Bean
public abstract ItemWriter<T> writer();
}
主要規(guī)定了公用方法的執(zhí)行策略,而具體的job名稱,讀,寫還是需要具體JOB去實現(xiàn)的。
具體Job實現(xiàn)
@Configuration
@EnableBatchProcessing
public class SyncPersonJob extends JobBase<Person> {
@Autowired
private DataSource dataSource;
@Autowired
@Qualifier("primaryJdbcTemplate")
private JdbcTemplate jdbcTemplate;
/**
* 初始化,規(guī)則了job名稱和監(jiān)視器.
*/
public SyncPersonJob() {
super("personJob", new PersonJobListener(), new PersonItemProcessor(), new BeanValidator<>());
}
@Override
public ItemReader<Person> reader() throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("select * from person");
String sql = sb.toString();
JdbcCursorItemReader<Person> jdbcCursorItemReader =
new JdbcCursorItemReader<>();
jdbcCursorItemReader.setSql(sql);
jdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
jdbcCursorItemReader.setDataSource(dataSource);
return jdbcCursorItemReader;
}
@Override
@Bean("personJobWriter")
public ItemWriter<Person> writer() {
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
String sql = "insert into person_export " + "(id,name,age,nation,address) "
+ "values(:id, :name, :age, :nation,:address)";
writer.setSql(sql);
writer.setDataSource(dataSource);
return writer;
}
}
寫操作需要定義自己的bean的聲明
注意,需要為每個job的write啟個名稱,否則在多job時,write將會被打亂
/**
* 批量寫數(shù)據(jù).
*
* @return
*/
@Override
@Bean("personVerson2JobWriter")
public ItemWriter<Person> writer() {
}
添加一個api,手動觸發(fā)
@Autowired
SyncPersonJob syncPersonJob;
@Autowired
JobLauncher jobLauncher;
void exec(Job job) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
@RequestMapping("/run1")
public String run1() throws Exception {
exec(syncPersonJob.getJob());
return "personJob success";
}
總結(jié)
以上所述是小編給大家介紹的springbatch的封裝與使用實例詳解,希望對大家有所幫助,如果大家有任何疑問歡迎給我留言,小編會及時回復(fù)大家的!
相關(guān)文章
約定優(yōu)于配置_動力節(jié)點Java學(xué)院整理
以前做項目,總是寫Ant配置文件,滿足于自己更靈活的配置,而沒有去思考,這么做到底值不值得2017-08-08
java圖的深度優(yōu)先遍歷實現(xiàn)隨機生成迷宮
這篇文章主要為大家詳細介紹了java圖的深度優(yōu)先遍歷實現(xiàn)隨機生成迷宮,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-01-01
Mybatis Plus 字段為空值時執(zhí)行更新方法未更新解決方案
這篇文章主要介紹了Mybatis Plus 字段為空值時執(zhí)行更新方法未更新解決方案,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09
Springboot注入成員變量HttpServletRequest的原理分析
這篇文章主要介紹了Springboot注入成員變量HttpServletRequest的原理分析,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05
Linux中使用shell腳本管理Java應(yīng)用程序
在日常開發(fā)和運維工作中,管理基于Java的應(yīng)用程序是一項基礎(chǔ)且頻繁的任務(wù),本文將通過一個示例腳本,展示如何利用Shell腳本簡化這一流程,實現(xiàn)Java應(yīng)用的一鍵式啟動、停止與重啟操作,本腳本不僅提升了工作效率,還確保了操作的標準化與可靠性2024-06-06

