springbatch的封裝與使用實(shí)例詳解
Spring Batch官網(wǎng)介紹:
A lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.(一款輕量的、全面的批處理框架,用于開發(fā)強(qiáng)大的日常運(yùn)營的企業(yè)級批處理應(yīng)用程序。)
springbatch
主要實(shí)現(xiàn)批量數(shù)據(jù)的處理,我對batch進(jìn)行的封裝,提出了jobBase類型,具體job需要實(shí)現(xiàn)它即可。Spring Batch 不僅提供了統(tǒng)一的讀寫接口、豐富的任務(wù)處理方式、靈活的事務(wù)管理及并發(fā)處理,同時(shí)還支持日志、監(jiān)控、任務(wù)重啟與跳過等特性,大大簡化了批處理應(yīng)用開發(fā),將開發(fā)人員從復(fù)雜的任務(wù)配置管理過程中解放出來,使他們可以更多地去關(guān)注核心的業(yè)務(wù)處理過程。
幾個(gè)組件
•job
•step
•read
•write
•listener
•process
•validator
JobBase定義了幾個(gè)公用的方法
/** * springBatch的job基礎(chǔ)類. */ public abstract class JobBase<T> { /** * 批次. */ protected int chunkCount = 5000; /** * 監(jiān)聽器. */ private JobExecutionListener jobExecutionListener; /** * 處理器. */ private ValidatingItemProcessor<T> validatingItemProcessor; /** * job名稱. */ private String jobName; /** * 檢驗(yàn)器. */ private Validator<T> validator; @Autowired private JobBuilderFactory job; @Autowired private StepBuilderFactory step; /** * 初始化. * * @param jobName job名稱 * @param jobExecutionListener 監(jiān)聽器 * @param validatingItemProcessor 處理器 * @param validator 檢驗(yàn) */ public JobBase(String jobName, JobExecutionListener jobExecutionListener, ValidatingItemProcessor<T> validatingItemProcessor, Validator<T> validator) { this.jobName = jobName; this.jobExecutionListener = jobExecutionListener; this.validatingItemProcessor = validatingItemProcessor; this.validator = validator; } /** * job初始化與啟動. */ public Job getJob() throws Exception { return job.get(jobName).incrementer(new RunIdIncrementer()) .start(syncStep()) .listener(jobExecutionListener) .build(); } /** * 執(zhí)行步驟. * * @return */ public Step syncStep() throws Exception { return step.get("step1") .<T, T>chunk(chunkCount) .reader(reader()) .processor(processor()) .writer(writer()) .build(); } /** * 單條處理數(shù)據(jù). * * @return */ public ItemProcessor<T, T> processor() { validatingItemProcessor.setValidator(processorValidator()); return validatingItemProcessor; } /** * 校驗(yàn)數(shù)據(jù). * * @return */ @Bean public Validator<T> processorValidator() { return validator; } /** * 批量讀數(shù)據(jù). * * @return * @throws Exception */ public abstract ItemReader<T> reader() throws Exception; /** * 批量寫數(shù)據(jù). * * @return */ @Bean public abstract ItemWriter<T> writer(); }
主要規(guī)定了公用方法的執(zhí)行策略,而具體的job名稱,讀,寫還是需要具體JOB去實(shí)現(xiàn)的。
具體Job實(shí)現(xiàn)
@Configuration @EnableBatchProcessing public class SyncPersonJob extends JobBase<Person> { @Autowired private DataSource dataSource; @Autowired @Qualifier("primaryJdbcTemplate") private JdbcTemplate jdbcTemplate; /** * 初始化,規(guī)則了job名稱和監(jiān)視器. */ public SyncPersonJob() { super("personJob", new PersonJobListener(), new PersonItemProcessor(), new BeanValidator<>()); } @Override public ItemReader<Person> reader() throws Exception { StringBuffer sb = new StringBuffer(); sb.append("select * from person"); String sql = sb.toString(); JdbcCursorItemReader<Person> jdbcCursorItemReader = new JdbcCursorItemReader<>(); jdbcCursorItemReader.setSql(sql); jdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class)); jdbcCursorItemReader.setDataSource(dataSource); return jdbcCursorItemReader; } @Override @Bean("personJobWriter") public ItemWriter<Person> writer() { JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>()); String sql = "insert into person_export " + "(id,name,age,nation,address) " + "values(:id, :name, :age, :nation,:address)"; writer.setSql(sql); writer.setDataSource(dataSource); return writer; } }
寫操作需要定義自己的bean的聲明
注意,需要為每個(gè)job的write啟個(gè)名稱,否則在多job時(shí),write將會被打亂
/** * 批量寫數(shù)據(jù). * * @return */ @Override @Bean("personVerson2JobWriter") public ItemWriter<Person> writer() { }
添加一個(gè)api,手動觸發(fā)
@Autowired SyncPersonJob syncPersonJob; @Autowired JobLauncher jobLauncher; void exec(Job job) throws Exception { JobParameters jobParameters = new JobParametersBuilder() .addLong("time", System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(job, jobParameters); } @RequestMapping("/run1") public String run1() throws Exception { exec(syncPersonJob.getJob()); return "personJob success"; }
總結(jié)
以上所述是小編給大家介紹的springbatch的封裝與使用實(shí)例詳解,希望對大家有所幫助,如果大家有任何疑問歡迎給我留言,小編會及時(shí)回復(fù)大家的!
相關(guān)文章
約定優(yōu)于配置_動力節(jié)點(diǎn)Java學(xué)院整理
以前做項(xiàng)目,總是寫Ant配置文件,滿足于自己更靈活的配置,而沒有去思考,這么做到底值不值得2017-08-08java圖的深度優(yōu)先遍歷實(shí)現(xiàn)隨機(jī)生成迷宮
這篇文章主要為大家詳細(xì)介紹了java圖的深度優(yōu)先遍歷實(shí)現(xiàn)隨機(jī)生成迷宮,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01Java多態(tài)用法與注意點(diǎn)實(shí)例分析
這篇文章主要介紹了Java多態(tài)用法與注意點(diǎn),結(jié)合實(shí)例形式分析了java多態(tài)相關(guān)的向上轉(zhuǎn)型、向下轉(zhuǎn)型、隱藏等相關(guān)操作技巧,需要的朋友可以參考下2019-08-08Mybatis Plus 字段為空值時(shí)執(zhí)行更新方法未更新解決方案
這篇文章主要介紹了Mybatis Plus 字段為空值時(shí)執(zhí)行更新方法未更新解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09Jenkins如何實(shí)現(xiàn)自動打包部署linux
這篇文章主要介紹了Jenkins如何實(shí)現(xiàn)自動打包部署linux,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11Springboot注入成員變量HttpServletRequest的原理分析
這篇文章主要介紹了Springboot注入成員變量HttpServletRequest的原理分析,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05Linux中使用shell腳本管理Java應(yīng)用程序
在日常開發(fā)和運(yùn)維工作中,管理基于Java的應(yīng)用程序是一項(xiàng)基礎(chǔ)且頻繁的任務(wù),本文將通過一個(gè)示例腳本,展示如何利用Shell腳本簡化這一流程,實(shí)現(xiàn)Java應(yīng)用的一鍵式啟動、停止與重啟操作,本腳本不僅提升了工作效率,還確保了操作的標(biāo)準(zhǔn)化與可靠性2024-06-06