詳解批處理框架之Spring Batch
一、Spring Batch的概念知識
1.1、分層架構(gòu)
Spring Batch
的分層架構(gòu)圖如下:
可以看到它分為三層,分別是:
Application
應(yīng)用層:包含了所有任務(wù)batch jobs
和開發(fā)人員自定義的代碼,主要是根據(jù)項目需要開發(fā)的業(yè)務(wù)流程等。Batch Core
核心層:包含啟動和管理任務(wù)的運(yùn)行環(huán)境類,如JobLauncher
等。Batch Infrastructure
基礎(chǔ)層:上面兩層是建立在基礎(chǔ)層之上的,包含基礎(chǔ)的讀入reader
和寫出writer
、重試框架等。
1.2、關(guān)鍵概念
理解下圖所涉及的概念至關(guān)重要,不然很難進(jìn)行后續(xù)開發(fā)和問題分析。
1.2.1、JobRepository
專門負(fù)責(zé)與數(shù)據(jù)庫打交道,對整個批處理的新增、更新、執(zhí)行進(jìn)行記錄。所以Spring Batch
是需要依賴數(shù)據(jù)庫來管理的。
1.2.2、任務(wù)啟動器JobLauncher
負(fù)責(zé)啟動任務(wù)Job
。
1.2.3、任務(wù)Job
Job
是封裝整個批處理過程的單位,跑一個批處理任務(wù),就是跑一個Job
所定義的內(nèi)容。
上圖介紹了Job
的一些相關(guān)概念:
Job
:封裝處理實體,定義過程邏輯。JobInstance
:Job
的運(yùn)行實例,不同的實例,參數(shù)不同,所以定義好一個Job
后可以通過不同參數(shù)運(yùn)行多次。JobParameters
:與JobInstance
相關(guān)聯(lián)的參數(shù)。JobExecution
:代表Job
的一次實際執(zhí)行,可能成功、可能失敗。
所以,開發(fā)人員要做的事情,就是定義Job
。
1.2.4、步驟Step
Step
是對Job
某個過程的封裝,一個Job
可以包含一個或多個Step
,一步步的Step
按特定邏輯執(zhí)行,才代表Job
執(zhí)行完成。
通過定義Step
來組裝Job
可以更靈活地實現(xiàn)復(fù)雜的業(yè)務(wù)邏輯。
1.2.5、輸入——處理——輸出
所以,定義一個Job
關(guān)鍵是定義好一個或多個Step
,然后把它們組裝好即可。而定義Step
有多種方法,但有一種常用的模型就是輸入——處理——輸出
,即Item Reader
、Item Processor
和Item Writer
。比如通過Item Reader
從文件輸入數(shù)據(jù),然后通過Item Processor
進(jìn)行業(yè)務(wù)處理和數(shù)據(jù)轉(zhuǎn)換,最后通過Item Writer
寫到數(shù)據(jù)庫中去。
Spring Batch
為我們提供了許多開箱即用的Reader
和Writer
,非常方便。
二、代碼實例
理解了基本概念后,就直接通過代碼來感受一下吧。整個項目的功能是從多個csv
文件中讀數(shù)據(jù),處理后輸出到一個csv
文件。
2.1、基本框架
添加依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency>
需要添加Spring Batch
的依賴,同時使用H2
作為內(nèi)存數(shù)據(jù)庫比較方便,實際生產(chǎn)肯定是要使用外部的數(shù)據(jù)庫,如Oracle
、PostgreSQL
。
入口主類:
@SpringBootApplication @EnableBatchProcessing public class PkslowBatchJobMain { public static void main(String[] args) { SpringApplication.run(PkslowBatchJobMain.class, args); } }
也很簡單,只是在Springboot
的基礎(chǔ)上添加注解@EnableBatchProcessing
。
領(lǐng)域?qū)嶓w類Employee
:
package com.pkslow.batch.entity; public class Employee { String id; String firstName; String lastName; }
對應(yīng)的csv
文件內(nèi)容如下:
id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
2.2、輸入——處理——輸出
2.2.1、讀取ItemReader
因為有多個輸入文件,所以定義如下:
@Value("input/inputData*.csv") private Resource[] inputResources; @Bean public MultiResourceItemReader<Employee> multiResourceItemReader() { MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>(); resourceItemReader.setResources(inputResources); resourceItemReader.setDelegate(reader()); return resourceItemReader; } @Bean public FlatFileItemReader<Employee> reader() { FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>(); //跳過csv文件第一行,為表頭 reader.setLinesToSkip(1); reader.setLineMapper(new DefaultLineMapper() { { setLineTokenizer(new DelimitedLineTokenizer() { { //字段名 setNames(new String[] { "id", "firstName", "lastName" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() { { //轉(zhuǎn)換化后的目標(biāo)類 setTargetType(Employee.class); } }); } }); return reader; }
這里使用了FlatFileItemReader
,方便我們從文件讀取數(shù)據(jù)。
2.2.2、處理ItemProcessor
為了簡單演示,處理很簡單,就是把最后一列轉(zhuǎn)為大寫:
public ItemProcessor<Employee, Employee> itemProcessor() { return employee -> { employee.setLastName(employee.getLastName().toUpperCase()); return employee; }; }
2.2.3、輸出ItremWriter
比較簡單,代碼及注釋如下:
private Resource outputResource = new FileSystemResource("output/outputData.csv"); @Bean public FlatFileItemWriter<Employee> writer() { FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>(); writer.setResource(outputResource); //是否為追加模式 writer.setAppendAllowed(true); writer.setLineAggregator(new DelimitedLineAggregator<Employee>() { { //設(shè)置分割符 setDelimiter(","); setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() { { //設(shè)置字段 setNames(new String[] { "id", "firstName", "lastName" }); } }); } }); return writer; }
2.3、Step
有了Reader-Processor-Writer
后,就可以定義Step
了:
@Bean public Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .processor(itemProcessor()) .writer(writer()) .build(); }
這里有一個chunk
的設(shè)置,值為5
,意思是5條記錄后再提交輸出,可以根據(jù)自己需求定義。
2.4、Job
完成了Step
的編碼,定義Job
就容易了:
@Bean public Job pkslowCsvJob() { return jobBuilderFactory .get("pkslowCsvJob") .incrementer(new RunIdIncrementer()) .start(csvStep()) .build(); }
2.5、運(yùn)行
完成以上編碼后,執(zhí)行程序,結(jié)果如下:
成功讀取數(shù)據(jù),并將最后字段轉(zhuǎn)為大寫,并輸出到outputData.csv
文件。
三、監(jiān)聽Listener
可以通過Listener
接口對特定事件進(jìn)行監(jiān)聽,以實現(xiàn)更多業(yè)務(wù)功能。比如如果處理失敗,就記錄一條失敗日志;處理完成,就通知下游拿數(shù)據(jù)等。
我們分別對Read
、Process
和Write
事件進(jìn)行監(jiān)聽,對應(yīng)分別要實現(xiàn)ItemReadListener
接口、ItemProcessListener
接口和ItemWriteListener
接口。因為代碼比較簡單,就是打印一下日志,這里只貼出ItemWriteListener
的實現(xiàn)代碼:
public class PkslowWriteListener implements ItemWriteListener<Employee> { private static final Log logger = LogFactory.getLog(PkslowWriteListener.class); @Override public void beforeWrite(List<? extends Employee> list) { logger.info("beforeWrite: " + list); } @Override public void afterWrite(List<? extends Employee> list) { logger.info("afterWrite: " + list); } @Override public void onWriteError(Exception e, List<? extends Employee> list) { logger.info("onWriteError: " + list); } }
把實現(xiàn)的監(jiān)聽器listener
整合到Step
中去:
@Bean public Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .listener(new PkslowReadListener()) .processor(itemProcessor()) .listener(new PkslowProcessListener()) .writer(writer()) .listener(new PkslowWriteListener()) .build(); }
執(zhí)行后看一下日志:
這里就能明顯看到之前設(shè)置的chunk
的作用了。Writer
每次是處理5條記錄,如果一條輸出一次,會對IO
造成壓力。
以上就是詳解Spring Batch入門之優(yōu)秀的批處理框架的詳細(xì)內(nèi)容,更多關(guān)于Spring Batch 批處理框架的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
MyBatis中一對多的xml配置方式(嵌套查詢/嵌套結(jié)果)
這篇文章主要介紹了MyBatis中一對多的xml配置方式(嵌套查詢/嵌套結(jié)果),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03SpringBoot全局異常處理之多個處理器匹配順序(最新推薦)
這篇文章主要介紹了SpringBoot全局異常處理之多個處理器匹配順序(最新推薦),調(diào)試源碼可見匹配順序為:異常層級高者優(yōu)先,再清楚點(diǎn),子類異常處理器優(yōu)先,本文給大家介紹的非常詳細(xì),感興趣的朋友一起看看吧2024-03-03解決IntelliJ IDEA創(chuàng)建spring boot無法連接http://start.spring.io/問題
這篇文章主要介紹了解決IntelliJ IDEA創(chuàng)建spring boot無法連接http://start.spring.io/問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08全面解釋java中StringBuilder、StringBuffer、String類之間的關(guān)系
String的值是不可變的,這就導(dǎo)致每次對String的操作都會生成新的String對象,不僅效率低下,而且大量浪費(fèi)有限的內(nèi)存空間,StringBuffer是可變類,和線程安全的字符串操作類,任何對它指向的字符串的操作都不會產(chǎn)生新的對象,StringBuffer和StringBuilder類功能基本相似2013-01-01Win10系統(tǒng)下配置Java環(huán)境變量
今天給大家?guī)淼氖顷P(guān)于Java的相關(guān)知識,文章圍繞著Win10系統(tǒng)下配置Java環(huán)境變量展開,文中有非常詳細(xì)的介紹及圖文示例,需要的朋友可以參考下2021-06-06