詳解批處理框架之Spring Batch
一、Spring Batch的概念知識
1.1、分層架構(gòu)
Spring Batch
的分層架構(gòu)圖如下:
可以看到它分為三層,分別是:
Application
應(yīng)用層:包含了所有任務(wù)batch jobs
和開發(fā)人員自定義的代碼,主要是根據(jù)項(xiàng)目需要開發(fā)的業(yè)務(wù)流程等。Batch Core
核心層:包含啟動和管理任務(wù)的運(yùn)行環(huán)境類,如JobLauncher
等。Batch Infrastructure
基礎(chǔ)層:上面兩層是建立在基礎(chǔ)層之上的,包含基礎(chǔ)的讀入reader
和寫出writer
、重試框架等。
1.2、關(guān)鍵概念
理解下圖所涉及的概念至關(guān)重要,不然很難進(jìn)行后續(xù)開發(fā)和問題分析。
1.2.1、JobRepository
專門負(fù)責(zé)與數(shù)據(jù)庫打交道,對整個(gè)批處理的新增、更新、執(zhí)行進(jìn)行記錄。所以Spring Batch
是需要依賴數(shù)據(jù)庫來管理的。
1.2.2、任務(wù)啟動器JobLauncher
負(fù)責(zé)啟動任務(wù)Job
。
1.2.3、任務(wù)Job
Job
是封裝整個(gè)批處理過程的單位,跑一個(gè)批處理任務(wù),就是跑一個(gè)Job
所定義的內(nèi)容。
上圖介紹了Job
的一些相關(guān)概念:
Job
:封裝處理實(shí)體,定義過程邏輯。JobInstance
:Job
的運(yùn)行實(shí)例,不同的實(shí)例,參數(shù)不同,所以定義好一個(gè)Job
后可以通過不同參數(shù)運(yùn)行多次。JobParameters
:與JobInstance
相關(guān)聯(lián)的參數(shù)。JobExecution
:代表Job
的一次實(shí)際執(zhí)行,可能成功、可能失敗。
所以,開發(fā)人員要做的事情,就是定義Job
。
1.2.4、步驟Step
Step
是對Job
某個(gè)過程的封裝,一個(gè)Job
可以包含一個(gè)或多個(gè)Step
,一步步的Step
按特定邏輯執(zhí)行,才代表Job
執(zhí)行完成。
通過定義Step
來組裝Job
可以更靈活地實(shí)現(xiàn)復(fù)雜的業(yè)務(wù)邏輯。
1.2.5、輸入——處理——輸出
所以,定義一個(gè)Job
關(guān)鍵是定義好一個(gè)或多個(gè)Step
,然后把它們組裝好即可。而定義Step
有多種方法,但有一種常用的模型就是輸入——處理——輸出
,即Item Reader
、Item Processor
和Item Writer
。比如通過Item Reader
從文件輸入數(shù)據(jù),然后通過Item Processor
進(jìn)行業(yè)務(wù)處理和數(shù)據(jù)轉(zhuǎn)換,最后通過Item Writer
寫到數(shù)據(jù)庫中去。
Spring Batch
為我們提供了許多開箱即用的Reader
和Writer
,非常方便。
二、代碼實(shí)例
理解了基本概念后,就直接通過代碼來感受一下吧。整個(gè)項(xiàng)目的功能是從多個(gè)csv
文件中讀數(shù)據(jù),處理后輸出到一個(gè)csv
文件。
2.1、基本框架
添加依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency>
需要添加Spring Batch
的依賴,同時(shí)使用H2
作為內(nèi)存數(shù)據(jù)庫比較方便,實(shí)際生產(chǎn)肯定是要使用外部的數(shù)據(jù)庫,如Oracle
、PostgreSQL
。
入口主類:
@SpringBootApplication @EnableBatchProcessing public class PkslowBatchJobMain { public static void main(String[] args) { SpringApplication.run(PkslowBatchJobMain.class, args); } }
也很簡單,只是在Springboot
的基礎(chǔ)上添加注解@EnableBatchProcessing
。
領(lǐng)域?qū)嶓w類Employee
:
package com.pkslow.batch.entity; public class Employee { String id; String firstName; String lastName; }
對應(yīng)的csv
文件內(nèi)容如下:
id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
2.2、輸入——處理——輸出
2.2.1、讀取ItemReader
因?yàn)橛卸鄠€(gè)輸入文件,所以定義如下:
@Value("input/inputData*.csv") private Resource[] inputResources; @Bean public MultiResourceItemReader<Employee> multiResourceItemReader() { MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>(); resourceItemReader.setResources(inputResources); resourceItemReader.setDelegate(reader()); return resourceItemReader; } @Bean public FlatFileItemReader<Employee> reader() { FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>(); //跳過csv文件第一行,為表頭 reader.setLinesToSkip(1); reader.setLineMapper(new DefaultLineMapper() { { setLineTokenizer(new DelimitedLineTokenizer() { { //字段名 setNames(new String[] { "id", "firstName", "lastName" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() { { //轉(zhuǎn)換化后的目標(biāo)類 setTargetType(Employee.class); } }); } }); return reader; }
這里使用了FlatFileItemReader
,方便我們從文件讀取數(shù)據(jù)。
2.2.2、處理ItemProcessor
為了簡單演示,處理很簡單,就是把最后一列轉(zhuǎn)為大寫:
public ItemProcessor<Employee, Employee> itemProcessor() { return employee -> { employee.setLastName(employee.getLastName().toUpperCase()); return employee; }; }
2.2.3、輸出ItremWriter
比較簡單,代碼及注釋如下:
private Resource outputResource = new FileSystemResource("output/outputData.csv"); @Bean public FlatFileItemWriter<Employee> writer() { FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>(); writer.setResource(outputResource); //是否為追加模式 writer.setAppendAllowed(true); writer.setLineAggregator(new DelimitedLineAggregator<Employee>() { { //設(shè)置分割符 setDelimiter(","); setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() { { //設(shè)置字段 setNames(new String[] { "id", "firstName", "lastName" }); } }); } }); return writer; }
2.3、Step
有了Reader-Processor-Writer
后,就可以定義Step
了:
@Bean public Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .processor(itemProcessor()) .writer(writer()) .build(); }
這里有一個(gè)chunk
的設(shè)置,值為5
,意思是5條記錄后再提交輸出,可以根據(jù)自己需求定義。
2.4、Job
完成了Step
的編碼,定義Job
就容易了:
@Bean public Job pkslowCsvJob() { return jobBuilderFactory .get("pkslowCsvJob") .incrementer(new RunIdIncrementer()) .start(csvStep()) .build(); }
2.5、運(yùn)行
完成以上編碼后,執(zhí)行程序,結(jié)果如下:
成功讀取數(shù)據(jù),并將最后字段轉(zhuǎn)為大寫,并輸出到outputData.csv
文件。
三、監(jiān)聽Listener
可以通過Listener
接口對特定事件進(jìn)行監(jiān)聽,以實(shí)現(xiàn)更多業(yè)務(wù)功能。比如如果處理失敗,就記錄一條失敗日志;處理完成,就通知下游拿數(shù)據(jù)等。
我們分別對Read
、Process
和Write
事件進(jìn)行監(jiān)聽,對應(yīng)分別要實(shí)現(xiàn)ItemReadListener
接口、ItemProcessListener
接口和ItemWriteListener
接口。因?yàn)榇a比較簡單,就是打印一下日志,這里只貼出ItemWriteListener
的實(shí)現(xiàn)代碼:
public class PkslowWriteListener implements ItemWriteListener<Employee> { private static final Log logger = LogFactory.getLog(PkslowWriteListener.class); @Override public void beforeWrite(List<? extends Employee> list) { logger.info("beforeWrite: " + list); } @Override public void afterWrite(List<? extends Employee> list) { logger.info("afterWrite: " + list); } @Override public void onWriteError(Exception e, List<? extends Employee> list) { logger.info("onWriteError: " + list); } }
把實(shí)現(xiàn)的監(jiān)聽器listener
整合到Step
中去:
@Bean public Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .listener(new PkslowReadListener()) .processor(itemProcessor()) .listener(new PkslowProcessListener()) .writer(writer()) .listener(new PkslowWriteListener()) .build(); }
執(zhí)行后看一下日志:
這里就能明顯看到之前設(shè)置的chunk
的作用了。Writer
每次是處理5條記錄,如果一條輸出一次,會對IO
造成壓力。
以上就是詳解Spring Batch入門之優(yōu)秀的批處理框架的詳細(xì)內(nèi)容,更多關(guān)于Spring Batch 批處理框架的資料請關(guān)注腳本之家其它相關(guān)文章!
- 源碼解析springbatch的job運(yùn)行機(jī)制
- spring中jdbcTemplate.batchUpdate的幾種使用情況
- spring?batch線上異常定位記錄
- Spring Batch 如何自定義ItemReader
- 手把手教你搭建第一個(gè)Spring Batch項(xiàng)目的步驟
- 基于Spring Batch向Elasticsearch批量導(dǎo)入數(shù)據(jù)示例
- Spring Batch入門教程篇
- Spring Batch讀取txt文件并寫入數(shù)據(jù)庫的方法教程
- 使用Spring?Batch實(shí)現(xiàn)大數(shù)據(jù)處理的操作方法
相關(guān)文章
MyBatis中一對多的xml配置方式(嵌套查詢/嵌套結(jié)果)
這篇文章主要介紹了MyBatis中一對多的xml配置方式(嵌套查詢/嵌套結(jié)果),具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03Maven將Jar包打入本地倉庫的實(shí)現(xiàn)
項(xiàng)目需要用到一個(gè)Jar包,不能從遠(yuǎn)程倉庫拉取,只有一個(gè)Jar包,所以需要將Jar包打入到本地倉庫才能引入項(xiàng)目,本文主要介紹了Maven將Jar包打入本地倉庫的實(shí)現(xiàn),感興趣的可以了解一下2023-12-12SpringBoot全局異常處理之多個(gè)處理器匹配順序(最新推薦)
這篇文章主要介紹了SpringBoot全局異常處理之多個(gè)處理器匹配順序(最新推薦),調(diào)試源碼可見匹配順序?yàn)椋寒惓蛹壐哒邇?yōu)先,再清楚點(diǎn),子類異常處理器優(yōu)先,本文給大家介紹的非常詳細(xì),感興趣的朋友一起看看吧2024-03-03Java集合的組內(nèi)平均值的計(jì)算方法總結(jié)
在Java中,經(jīng)常需要對集合進(jìn)行各種操作,其中之一就是計(jì)算集合的組內(nèi)平均值,本文將介紹如何使用Java集合來計(jì)算組內(nèi)平均值,并提供一些示例代碼和實(shí)用技巧2024-08-08解決IntelliJ IDEA創(chuàng)建spring boot無法連接http://start.spring.io/問題
這篇文章主要介紹了解決IntelliJ IDEA創(chuàng)建spring boot無法連接http://start.spring.io/問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08全面解釋java中StringBuilder、StringBuffer、String類之間的關(guān)系
String的值是不可變的,這就導(dǎo)致每次對String的操作都會生成新的String對象,不僅效率低下,而且大量浪費(fèi)有限的內(nèi)存空間,StringBuffer是可變類,和線程安全的字符串操作類,任何對它指向的字符串的操作都不會產(chǎn)生新的對象,StringBuffer和StringBuilder類功能基本相似2013-01-01Win10系統(tǒng)下配置Java環(huán)境變量
今天給大家?guī)淼氖顷P(guān)于Java的相關(guān)知識,文章圍繞著Win10系統(tǒng)下配置Java環(huán)境變量展開,文中有非常詳細(xì)的介紹及圖文示例,需要的朋友可以參考下2021-06-06