Spring Batch輕量級(jí)批處理框架實(shí)戰(zhàn)
1 實(shí)戰(zhàn)前的理論基礎(chǔ)
1.1 Spring Batch是什么
Spring Batch 是一個(gè)輕量級(jí)、全面的批處理框架,旨在支持開發(fā)對(duì)企業(yè)系統(tǒng)日常運(yùn)營至關(guān)重要的強(qiáng)大的批處理應(yīng)用程序。同時(shí)使開發(fā)人員在必要時(shí)可以輕松訪問和利用更先進(jìn)的企業(yè)服務(wù)。Spring Batch 不是調(diào)度框架,它旨在與調(diào)度程序一起工作,而不是取代調(diào)度程序。
1.2 Spring Batch能做什么
- 自動(dòng)化、復(fù)雜的大量信息處理,無需用戶交互即可最有效地處理。這些操作通常包括基于時(shí)間的事件(例如月末計(jì)算、通知或通信)。
- 定期應(yīng)用在非常大的數(shù)據(jù)集上重復(fù)處理的復(fù)雜業(yè)務(wù)規(guī)則(例如,保險(xiǎn)福利確定或費(fèi)率調(diào)整)。
- 將從內(nèi)部和外部系統(tǒng)接收的信息集成到記錄系統(tǒng)中,這些信息通常需要以事務(wù)方式進(jìn)行格式化、驗(yàn)證和處理。批處理用于每天為企業(yè)處理數(shù)十億筆交易。
業(yè)務(wù)場景:
- 定期提交批處理
- 并發(fā)批處理:作業(yè)的并行處理
- 分階段的、企業(yè)消息驅(qū)動(dòng)的處理
- 大規(guī)模并行批處理
- 失敗后手動(dòng)或計(jì)劃重啟
- 依賴步驟的順序處理(擴(kuò)展到工作流驅(qū)動(dòng)的批處理)
- 部分處理:跳過記錄(例如,在回滾時(shí))
- 整批事務(wù),適用于小批量或現(xiàn)有存儲(chǔ)過程/腳本的情況
總之Spring batch可以做的:
- 從數(shù)據(jù)庫、文件或隊(duì)列中讀取大量記錄。
- 以某種方式處理數(shù)據(jù)。
- 以修改后的形式寫回?cái)?shù)據(jù)。
1.3 基礎(chǔ)架構(gòu)

1.4 核心概念和抽象

核心概念:一個(gè) Job 有一對(duì)多的Step,每個(gè)步驟都正好有一個(gè) ItemReader、一個(gè)ItemProcessor和 一個(gè)ItemWriter。需要啟動(dòng)作業(yè)(使用 JobLauncher),并且需要存儲(chǔ)有關(guān)當(dāng)前運(yùn)行進(jìn)程的元數(shù)據(jù)(在 中 JobRepository)。
2 各個(gè)組件介紹
2.1 Job
Job是封裝了整個(gè)批處理過程的實(shí)體。與其他 Spring 項(xiàng)目一樣,一個(gè)Job與 XML 配置文件或基于 Java 的配置連接在一起。這種配置可以被稱為“作業(yè)配置”。

可配置項(xiàng):
- 作業(yè)的簡單名稱。
Step實(shí)例的定義和排序。- 作業(yè)是否可重新啟動(dòng)。
2.2 Step
一個(gè)Step是一個(gè)域?qū)ο?,它封裝了批處理作業(yè)的一個(gè)獨(dú)立的、連續(xù)的階段。因此,每個(gè) Job 完全由一個(gè)或多個(gè)步驟組成。一個(gè)Step包含定義和控制實(shí)際批處理所需的所有信息。

一個(gè)StepExecution代表一次嘗試執(zhí)行一個(gè)Step。StepExecution 每次Step運(yùn)行時(shí)都會(huì)創(chuàng)建一個(gè)新的,類似于JobExecution。
2.3 ExecutionContext
一個(gè)ExecutionContext表示由框架持久化和控制的鍵/值對(duì)的集合,以允許開發(fā)人員有一個(gè)地方來存儲(chǔ)范圍為StepExecution對(duì)象或JobExecution對(duì)象的持久狀態(tài)。
2.4 JobRepository
JobRepository是上述所有 Stereotypes 的持久性機(jī)制。它提供了CRUD操作JobLauncher,Job以及Step實(shí)現(xiàn)。當(dāng) Job第一次啟動(dòng),一個(gè)JobExecution被從庫中獲得,并且,執(zhí)行的過程中,StepExecution和JobExecution實(shí)施方式是通過將它們傳遞到存儲(chǔ)庫持續(xù)。
使用 Java 配置時(shí),@EnableBatchProcessing注解提供了一個(gè) JobRepository作為開箱即用自動(dòng)配置的組件之一。
2.5 JobLauncher
JobLauncher表示一個(gè)簡單的接口,用于Job使用給定的 集合 啟動(dòng)JobParameters,如以下示例所示:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
期望實(shí)現(xiàn)JobExecution從 中 獲得有效JobRepository并執(zhí)行Job。
2.6 Item Reader
ItemReader是一種抽象,表示一次檢索Step一個(gè)項(xiàng)目的輸入。當(dāng)ItemReader用完它可以提供的項(xiàng)目時(shí),它通過返回來表明這一點(diǎn)null。
2.7 Item Writer
ItemWriter是一種抽象,表示一次一個(gè)Step、一批或一大塊項(xiàng)目的輸出。通常, anItemWriter不知道它接下來應(yīng)該接收的輸入,并且只知道在其當(dāng)前調(diào)用中傳遞的項(xiàng)目。
2.8 Item Processor
ItemProcessor是表示項(xiàng)目的業(yè)務(wù)處理的抽象。當(dāng)ItemReader讀取一個(gè)項(xiàng)目并ItemWriter寫入它們時(shí),它 ItemProcessor提供了一個(gè)訪問點(diǎn)來轉(zhuǎn)換或應(yīng)用其他業(yè)務(wù)處理。如果在處理該項(xiàng)目時(shí)確定該項(xiàng)目無效,則返回 null表示不應(yīng)寫出該項(xiàng)目。
3 Spring Batch實(shí)戰(zhàn)
下面就利用我們所學(xué)的理論實(shí)現(xiàn)一個(gè)最簡單的Spring Batch批處理項(xiàng)目
3.1 依賴和項(xiàng)目結(jié)構(gòu)以及配置文件
依賴
<!--Spring batch-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- web依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<!-- mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- mybatis-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.2.0</version>
</dependency>
項(xiàng)目結(jié)構(gòu)

配置文件
server.port=9000 spring.datasource.url=jdbc:mysql://localhost:3306/test spring.datasource.username=root spring.datasource.password=12345 spring.datasource.driver-class-name=com.mysql.jdbc.Driver
3.2 代碼和數(shù)據(jù)表
數(shù)據(jù)表
CREATE TABLE `student` (
`id` int(100) NOT NULL AUTO_INCREMENT,
`name` varchar(45) DEFAULT NULL,
`age` int(2) DEFAULT NULL,
`address` varchar(45) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=203579 DEFAULT CHARSET=utf8 ROW_FORMAT=REDUNDANT
Student實(shí)體類
/**
* @desc: Student實(shí)體類
* @author: YanMingXin
* @create: 2021/10/15-12:17
**/
@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
@ToString
@TableName("student")
public class Student {
@TableId(value = "id", type = IdType.AUTO)
private Long sId;
@TableField("name")
private String sName;
@TableField("age")
private Integer sAge;
@TableField("address")
private String sAddress;
}
Mapper層
/**
* @desc: Mapper層
* @author: YanMingXin
* @create: 2021/10/15-12:17
**/
@Mapper
@Repository
public interface StudentDao extends BaseMapper<Student> {
}
模擬數(shù)據(jù)庫(文件)中讀取類
/**
* @desc: 模擬數(shù)據(jù)庫中讀取
* @author: YanMingXin
* @create: 2021/10/16-10:13
**/
public class StudentVirtualDao {
/**
* 模擬從數(shù)據(jù)庫中讀取
*
* @return
*/
public List<Student> getStudents() {
ArrayList<Student> students = new ArrayList<>();
students.add(new Student(1L, "zs", 23, "Beijing"));
students.add(new Student(2L, "ls", 23, "Beijing"));
students.add(new Student(3L, "ww", 23, "Beijing"));
students.add(new Student(4L, "zl", 23, "Beijing"));
students.add(new Student(5L, "mq", 23, "Beijing"));
students.add(new Student(6L, "gb", 23, "Beijing"));
students.add(new Student(7L, "lj", 23, "Beijing"));
students.add(new Student(8L, "ss", 23, "Beijing"));
students.add(new Student(9L, "zsdd", 23, "Beijing"));
students.add(new Student(10L, "zss", 23, "Beijing"));
return students;
}
}
Service層接口
/**
* @desc:
* @author: YanMingXin
* @create: 2021/10/15-12:16
**/
public interface StudentService {
List<Student> selectStudentsFromDB();
void insertStudent(Student student);
}
Service層實(shí)現(xiàn)類
/**
* @desc: Service層實(shí)現(xiàn)類
* @author: YanMingXin
* @create: 2021/10/15-12:16
**/
@Service
public class StudentServiceImpl implements StudentService {
@Autowired
private StudentDao studentDao;
@Override
public List<Student> selectStudentsFromDB() {
return studentDao.selectList(null);
}
@Override
public void insertStudent(Student student) {
studentDao.insert(student);
}
}
最核心的配置類BatchConfiguration
/**
* @desc: BatchConfiguration
* @author: YanMingXin
* @create: 2021/10/15-12:25
**/
@Configuration
@EnableBatchProcessing
@SuppressWarnings("all")
public class BatchConfiguration {
/**
* 注入JobBuilderFactory
*/
@Autowired
public JobBuilderFactory jobBuilderFactory;
/**
* 注入StepBuilderFactory
*/
@Autowired
public StepBuilderFactory stepBuilderFactory;
/**
* 注入JobRepository
*/
@Autowired
public JobRepository jobRepository;
/**
* 注入JobLauncher
*/
@Autowired
private JobLauncher jobLauncher;
/**
* 注入自定義StudentService
*/
@Autowired
private StudentService studentService;
/**
* 注入自定義job
*/
@Autowired
private Job studentJob;
/**
* 封裝writer bean
*
* @return
*/
@Bean
public ItemWriter<Student> writer() {
ItemWriter<Student> writer = new ItemWriter() {
@Override
public void write(List list) throws Exception {
//debug發(fā)現(xiàn)是嵌套的List reader的線程List嵌套真正的List
list.forEach((stu) -> {
for (Student student : (ArrayList<Student>) stu) {
studentService.insertStudent(student);
}
});
}
};
return writer;
}
/**
* 封裝reader bean
*
* @return
*/
@Bean
public ItemReader<Student> reader() {
ItemReader<Student> reader = new ItemReader() {
@Override
public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
//模擬數(shù)據(jù)獲取
StudentVirtualDao virtualDao = new StudentVirtualDao();
return virtualDao.getStudents();
}
};
return reader;
}
/**
* 封裝processor bean
*
* @return
*/
@Bean
public ItemProcessor processor() {
ItemProcessor processor = new ItemProcessor() {
@Override
public Object process(Object o) throws Exception {
//debug發(fā)現(xiàn)o就是reader單次單線程讀取的數(shù)據(jù)
return o;
}
};
return processor;
}
/**
* 封裝自定義step
*
* @return
*/
@Bean
public Step studentStepOne() {
return stepBuilderFactory.get("studentStepOne")
.chunk(1)
.reader(reader()) //加入reader
.processor(processor()) //加入processor
.writer(writer())//加入writer
.build();
}
/**
* 封裝自定義job
*
* @return
*/
@Bean
public Job studentJob() {
return jobBuilderFactory.get("studentJob")
.flow(studentStepOne())//加入step
.end()
.build();
}
/**
* 使用spring 定時(shí)任務(wù)執(zhí)行
*/
@Scheduled(fixedRate = 5000)
public void printMessage() {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(studentJob, jobParameters);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.3 測試

項(xiàng)目啟動(dòng)1s之后

看數(shù)據(jù)庫,除了我們實(shí)體類定義的表以外多出來這么多表,這些表都是spring batch自帶的記錄日志和錯(cuò)誤的表,具體的字段含義的有待研究

4 實(shí)戰(zhàn)后的總結(jié)
Spring Batch有非??斓膶懭牒妥x取速度,但是帶來的影響就是非常耗費(fèi)內(nèi)存和數(shù)據(jù)庫連接池的資源如果使用不好的話還會(huì)發(fā)生異常,因此我們要進(jìn)行正確的配置,接下來我們進(jìn)行簡單的源碼探究:
4.1 JobBuilderFactory
job的獲取使用了簡單工廠模式和建造者模式JobBuilderFactory獲取JobBuilder在經(jīng)過配置返回一個(gè)job對(duì)象的實(shí)例,該實(shí)例就是Spring Batch中最頂級(jí)的組件,包含了n和step
public class JobBuilderFactory {
private JobRepository jobRepository;
public JobBuilderFactory(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}
//返回JobBuilder
public JobBuilder get(String name) {
JobBuilder builder = new JobBuilder(name).repository(jobRepository);
return builder;
}
}
jobBuilder類
public class JobBuilder extends JobBuilderHelper<JobBuilder> {
/**
* 為指定名稱的作業(yè)創(chuàng)建一個(gè)新的構(gòu)建器
*/
public JobBuilder(String name) {
super(name);
}
/**
* 創(chuàng)建將執(zhí)行步驟或步驟序列的新作業(yè)構(gòu)建器。
*/
public SimpleJobBuilder start(Step step) {
return new SimpleJobBuilder(this).start(step);
}
/**
* 創(chuàng)建將執(zhí)行流的新作業(yè)構(gòu)建器。
*/
public JobFlowBuilder start(Flow flow) {
return new FlowJobBuilder(this).start(flow);
}
/**
* 創(chuàng)建將執(zhí)行步驟或步驟序列的新作業(yè)構(gòu)建器
*/
public JobFlowBuilder flow(Step step) {
return new FlowJobBuilder(this).start(step);
}
}
4.2 StepBuilderFactory
直接看StepBuilder類
public class StepBuilder extends StepBuilderHelper<StepBuilder> {
public StepBuilder(String name) {
super(name);
}
/**
* 用自定義微線程構(gòu)建步驟,不一定是項(xiàng)處理。
*/
public TaskletStepBuilder tasklet(Tasklet tasklet) {
return new TaskletStepBuilder(this).tasklet(tasklet);
}
/**
* 構(gòu)建一個(gè)步驟,按照提供的大小以塊的形式處理項(xiàng)。為了將這一步擴(kuò)展到容錯(cuò),
* 在構(gòu)建器上調(diào)用SimpleStepBuilder的 faultolerant()方法。
* @param <I> 輸入類型
* @param <O> 輸出類型
*/
public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
return new SimpleStepBuilder<I, O>(this).chunk(chunkSize);
}
public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
return new SimpleStepBuilder<I, O>(this).chunk(completionPolicy);
}
public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) {
return new PartitionStepBuilder(this).partitioner(stepName, partitioner);
}
public PartitionStepBuilder partitioner(Step step) {
return new PartitionStepBuilder(this).step(step);
}
public JobStepBuilder job(Job job) {
return new JobStepBuilder(this).job(job);
}
/**
* 創(chuàng)建將執(zhí)行流的新步驟構(gòu)建器。
*/
public FlowStepBuilder flow(Flow flow) {
return new FlowStepBuilder(this).flow(flow);
}
}
參考文檔:
https://docs.spring.io/spring-batch/docs/4.3.x/reference/html/index.html
https://www.jdon.com/springbatch.html
到此這篇關(guān)于Spring Batch輕量級(jí)批處理框架實(shí)戰(zhàn)的文章就介紹到這了,更多相關(guān)Spring Batch批處理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot + FFmpeg實(shí)現(xiàn)一個(gè)簡單的M3U8切片轉(zhuǎn)碼系統(tǒng)
使用大名鼎鼎的ffmpeg,把視頻文件切片成m3u8,并且通過springboot,可以實(shí)現(xiàn)在線的點(diǎn)播。2021-05-05
Java方法遞歸的形式和常見遞歸算法(方法遞歸結(jié)合File類查找文件)
方法遞歸方法直接調(diào)用自己或者間接調(diào)用自己的形式稱為方法遞歸( recursion),遞歸做為一種算法在程序設(shè)計(jì)語言中廣泛應(yīng)用,這篇文章主要介紹了Java方法遞歸的形式和常見遞歸算法-方法遞歸結(jié)合File類查找文件,需要的朋友可以參考下2023-02-02
java實(shí)現(xiàn)mongodb的數(shù)據(jù)庫連接池
這篇文章主要介紹了基于java實(shí)現(xiàn)mongodb的數(shù)據(jù)庫連接池,Java通過使用mongo-2.7.3.jar包實(shí)現(xiàn)mongodb連接池,感興趣的小伙伴們可以參考一下2015-12-12
解決Spring boot2.0+配置攔截器攔截靜態(tài)資源的問題
這篇文章主要介紹了解決Spring boot2.0+配置攔截器攔截靜態(tài)資源的問題,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-08-08
快速搭建Spring Boot+MyBatis的項(xiàng)目IDEA(附源碼下載)
這篇文章主要介紹了快速搭建Spring Boot+MyBatis的項(xiàng)目IDEA(附源碼下載),本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12

