淺談Spring Batch在大型企業(yè)中的最佳實(shí)踐
在大型企業(yè)中,由于業(yè)務(wù)復(fù)雜、數(shù)據(jù)量大、數(shù)據(jù)格式不同、數(shù)據(jù)交互格式繁雜,并非所有的操作都能通過(guò)交互界面進(jìn)行處理。而有一些操作需要定期讀取大批量的數(shù)據(jù),然后進(jìn)行一系列的后續(xù)處理。這樣的過(guò)程就是“批處理”。
批處理應(yīng)用通常有以下特點(diǎn):
- 數(shù)據(jù)量大,從數(shù)萬(wàn)到數(shù)百萬(wàn)甚至上億不等;
- 整個(gè)過(guò)程全部自動(dòng)化,并預(yù)留一定接口進(jìn)行自定義配置;
- 這樣的應(yīng)用通常是周期性運(yùn)行,比如按日、周、月運(yùn)行;
- 對(duì)數(shù)據(jù)處理的準(zhǔn)確性要求高,并且需要容錯(cuò)機(jī)制、回滾機(jī)制、完善的日志監(jiān)控等。
什么是Spring batch
Spring batch是一個(gè)輕量級(jí)的全面的批處理框架,它專為大型企業(yè)而設(shè)計(jì),幫助開發(fā)健壯的批處理應(yīng)用。Spring batch為處理大批量數(shù)據(jù)提供了很多必要的可重用的功能,比如日志追蹤、事務(wù)管理、job執(zhí)行統(tǒng)計(jì)、重啟job和資源管理等。同時(shí)它也提供了優(yōu)化和分片技術(shù)用于實(shí)現(xiàn)高性能的批處理任務(wù)。
它的核心功能包括:
- 事務(wù)管理
- 基于塊的處理過(guò)程
- 聲明式的輸入/輸出操作
- 啟動(dòng)、終止、重啟任務(wù)
- 重試/跳過(guò)任務(wù)
- 基于Web的管理員接口
筆者所在的部門屬于國(guó)外某大型金融公司的CRM部門,在日常工作中我們經(jīng)常需要開發(fā)一些批處理應(yīng)用,對(duì)Spring Batch有著豐富的使用經(jīng)驗(yàn)。近段時(shí)間筆者特意總結(jié)了這些經(jīng)驗(yàn)。
使用Spring Batch 3.0以及Spring Boot
在使用Spring Batch時(shí)推薦使用最新的Spring Batch 3.0版本。相比Spring Batch2.2,它做了以下方面的提升:
- 支持JSR-352標(biāo)準(zhǔn)
- 支持Spring4以及Java8
- 增強(qiáng)了Spring Batch Integration的功能
- 支持JobScope
- 支持SQLite
支持Spring4和Java8是一個(gè)重大的提升。這樣就可以使用Spring4引入的Spring boot組件,從而開發(fā)效率方面有了一個(gè)質(zhì)的飛躍。引入Spring-batch框架只需要在build.gradle中加入一行代碼即可:
compile("org.springframework.boot:spring-boot-starter-batch")
而增強(qiáng)Spring Batch Integration的功能后,我們就可以很方便的和Spring家族的其他組件集成,還可以以多種方式來(lái)調(diào)用job,也支持遠(yuǎn)程分區(qū)操作以及遠(yuǎn)程塊處理。
而支持JobScope后我們可以隨時(shí)為對(duì)象注入當(dāng)前Job實(shí)例的上下文信息。只要我們制定Bean的scope為job scope,那么就可以隨時(shí)使用jobParameters和jobExecutionContext等信息。
<bean id="..." class="..." scope="job"> <property name="name" value="#{jobParameters[input]}" /> </bean> <bean id="..." class="..." scope="job"> <property name="name" value="#{jobExecutionContext['input.name']}.txt" /> </bean>
使用Java Config而不是xml的配置方式
之前我們?cè)谂渲胘ob和step的時(shí)候都習(xí)慣用xml的配置方式,但是隨著時(shí)間的推移發(fā)現(xiàn)問題頗多。
- xml文件數(shù)急劇膨脹,配置塊長(zhǎng)且復(fù)雜,可讀性很差;
- xml文件缺少語(yǔ)法檢查,有些低級(jí)錯(cuò)誤只有在運(yùn)行集成測(cè)試的時(shí)候才能發(fā)現(xiàn);
- 在xml文件中進(jìn)行代碼跳轉(zhuǎn)時(shí)IDE的支持力度不夠;
我們漸漸發(fā)現(xiàn)使用純Java類的配置方式更靈活,它是類型安全的,而且IDE的支持更好。在構(gòu)建job或step時(shí)采用的流式語(yǔ)法相比xml更加簡(jiǎn)潔易懂。
@Bean public Step step(){ return stepBuilders.get("step") .<Partner,Partner>chunk(1) .reader(reader()) .processor(processor()) .writer(writer()) .listener(logProcessListener()) .faultTolerant() .skipLimit(10) .skip(UnknownGenderException.class) .listener(logSkipListener()) .build(); }
在這個(gè)例子中可以很清楚的看到該step的配置,比如reader/processor/writer組件,以及配置了哪些listener等。
本地集成測(cè)試中使用內(nèi)存數(shù)據(jù)庫(kù)
Spring batch在運(yùn)行時(shí)需要數(shù)據(jù)庫(kù)支持,因?yàn)樗枰跀?shù)據(jù)庫(kù)中建立一套schema來(lái)存儲(chǔ)job和step運(yùn)行的統(tǒng)計(jì)信息。而在本地集成測(cè)試中我們可以借助Spring batch提供的內(nèi)存Repository來(lái)存儲(chǔ)Spring batch的任務(wù)執(zhí)行信息,這樣即避免了在本地配置一個(gè)數(shù)據(jù)庫(kù),又可以加快job的執(zhí)行。
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager"/> </bean>
我們?cè)赽uild.gradle中加入對(duì)hsqldb的依賴:
runtime(‘org.hsqldb:hsqldb:2.3.2')
然后在測(cè)試類中添加對(duì)DataSource的配置。
@EnableAutoConfiguration @EnableBatchProcessing @DataJpaTest @Import({DataSourceAutoConfiguration.class, BatchAutoConfiguration.class}) public class TestConfiguration { }
并且在applicaton.properties配置中添加初始化Database的配置:
spring.batch.initializer.enable=true
合理的使用Chunk機(jī)制
Spring batch在配置Step時(shí)采用的是基于Chunk的機(jī)制。即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進(jìn)行寫入操作。這樣可以最大化的優(yōu)化寫入效率,整個(gè)事務(wù)也是基于Chunk來(lái)進(jìn)行。
當(dāng)我們?cè)谛枰獙?shù)據(jù)寫入到文件、數(shù)據(jù)庫(kù)中之類的操作時(shí)可以適當(dāng)設(shè)置Chunk的值以滿足寫入效率最大化。但有些場(chǎng)景下我們的寫入操作其實(shí)是調(diào)用一個(gè)web service或者將消息發(fā)送到某個(gè)消息隊(duì)列中,那么這些場(chǎng)景下我們就需要設(shè)置Chunk的值為1,這樣既可以及時(shí)的處理寫入,也不會(huì)由于整個(gè)Chunk中發(fā)生異常后,在重試時(shí)出現(xiàn)重復(fù)調(diào)用服務(wù)或者重復(fù)發(fā)送消息的情況。
使用Listener來(lái)監(jiān)視job執(zhí)行情況并及時(shí)做相應(yīng)的處理
Spring batch提供了大量的Listener來(lái)對(duì)job的各個(gè)執(zhí)行環(huán)節(jié)進(jìn)行全面的監(jiān)控。
在job層面Spring batch提供了JobExecutionListener接口,其支持在Job開始或結(jié)束時(shí)進(jìn)行一些額外處理。在step層面Spring batch提供了StepExecutionListener,ChunkListener,ItemReadListener,ItemProcessListener,ItemWriteListener,SkipListener等接口,同時(shí)對(duì)Retry和Skip操作也提供了RetryListener及SkipListener。
通常我們會(huì)為每個(gè)job都實(shí)現(xiàn)一個(gè)JobExecutionListener,在afterJob操作中我們輸出job的執(zhí)行信息,包括執(zhí)行時(shí)間、job參數(shù)、退出代碼、執(zhí)行的step以及每個(gè)step的詳細(xì)信息。這樣無(wú)論是開發(fā)、測(cè)試還是運(yùn)維人員對(duì)整個(gè)job的執(zhí)行情況了如指掌。
如果某個(gè)step會(huì)發(fā)生skip的操作,我們也會(huì)為其實(shí)現(xiàn)一個(gè)SkipListener,并在其中記錄skip的數(shù)據(jù)條目,用于下一步的處理。
實(shí)現(xiàn)Listener有兩種方式,一種是繼承自相應(yīng)的接口,比如繼承JobExecutionListener接口,另一種是使用annoation(注解)的方式。經(jīng)過(guò)實(shí)踐我們認(rèn)為使用注解的方式更好一些,因?yàn)槭褂媒涌谀阈枰獙?shí)現(xiàn)接口的所有方法,而使用注解則只需要對(duì)相應(yīng)的方法添加annoation即可。
下面的這個(gè)類采用了繼承接口的方式,我們看到其實(shí)我們只用到了第一個(gè)方法,第二個(gè)和第三個(gè)都沒有用到。但是我們必須提供一個(gè)空的實(shí)現(xiàn)。
public class CustomSkipListener implements SkipListener<String, String> { @Override public void onSkipInRead(Throwable t) { // business logic } @Override public void onSkipInWrite(String item, Throwable t) { // no need } @Override public void onSkipInProcess(String item, Throwable t) { // no need } }
而使用annoation的方式可以簡(jiǎn)寫為:
public class CustomSkipListener { @OnSkipInRead public void onSkipInRead(Throwable t) { // business logic } }
使用Retry和Skip增強(qiáng)批處理工作的健壯性
在處理百萬(wàn)級(jí)的數(shù)據(jù)過(guò)程過(guò)程中難免會(huì)出現(xiàn)異常。如果一旦出現(xiàn)異常而導(dǎo)致整個(gè)批處理工作終止的話那么會(huì)導(dǎo)致后續(xù)的數(shù)據(jù)無(wú)法被處理。Spring Batch內(nèi)置了Retry(重試)和Skip(跳過(guò))機(jī)制幫助我們輕松處理各種異常。適合Retry的異常的特點(diǎn)是這些異常可能會(huì)隨著時(shí)間推移而消失,比如數(shù)據(jù)庫(kù)目前有鎖無(wú)法寫入、web服務(wù)當(dāng)前不可用、web服務(wù)滿載等。所以對(duì)這些異常我們可以配置Retry機(jī)制。而有些異常則不應(yīng)該配置Retry,比如解析文件出現(xiàn)異常等,因?yàn)檫@些異常即使Retry也會(huì)始終失敗。
即使Retry多次仍然失敗也無(wú)需讓整個(gè)step失敗,可以對(duì)指定的異常設(shè)置Skip選項(xiàng)從而保證后續(xù)的數(shù)據(jù)能夠被繼續(xù)處理。我們也可以配置SkipLimit選項(xiàng)保證當(dāng)Skip的數(shù)據(jù)條目達(dá)到一定數(shù)量后及時(shí)終止整個(gè)Job。
有時(shí)候我們需要在每次Retry中間隔做一些操作,比如延長(zhǎng)Retry時(shí)間,恢復(fù)操作現(xiàn)場(chǎng)等,Spring Batch提供了BackOffPolicy來(lái)達(dá)到目的。下面是一個(gè)配置了Retry機(jī)制、Skip機(jī)制以及BackOffPolicy的step示例。
@Bean public Step step(){ return stepBuilders.get("step") .<Partner,Partner>chunk(1) .reader(reader()) .processor(processor()) .writer(writer()) .listener(logProcessListener()) .faultTolerant() .skipLimit(10) .skip(UnknownGenderException.class) .retryLimit(5) .retry(ServiceUnavailableException.class) .backOffPolicy(backoffPolicy) .listener(logSkipListener()) .build(); }
使用自定義的Decider來(lái)實(shí)現(xiàn)Job flow
在Job執(zhí)行過(guò)程中不一定都是順序執(zhí)行的,我們經(jīng)常需要根據(jù)某個(gè)job的輸出數(shù)據(jù)或執(zhí)行結(jié)果來(lái)決定下一步的走向。以前我們會(huì)把一些判斷放置在下游step中進(jìn)行,這樣可能會(huì)導(dǎo)致有些step實(shí)際運(yùn)行了,但其實(shí)并沒有做任何事情。比如一個(gè)step執(zhí)行過(guò)程中會(huì)將失敗的數(shù)據(jù)條目記錄到一個(gè)報(bào)告中,而下一個(gè)step會(huì)判斷有沒有生成報(bào)告,如果生成了報(bào)告則將該報(bào)告發(fā)送給指定聯(lián)系人,如果沒有則不做任何事情。這種情況下可以通過(guò)Decider機(jī)制來(lái)實(shí)現(xiàn)Job的執(zhí)行流程。在Spring batch 3.0中Decider已經(jīng)從Step中獨(dú)立出來(lái),和Step處于同一級(jí)別。
public class ReportDecider implements JobExecutionDecider { @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { if (report.isExist()) { return new FlowExecutionStatus(“SEND"); } return new FlowExecutionStatus(“SKIP"); } }
而在job配置中可以這樣來(lái)使用Decider。這樣整個(gè)Job的執(zhí)行流程會(huì)更加清晰易懂。
public Job job() { return new JobBuilder("petstore") .start(orderProcess()) .next(reportDecider) .on("SEND").to(sendReportStep) .on("SKIP").end().build() .build() }
采用多種機(jī)制加速Job的執(zhí)行
批處理工作處理的數(shù)據(jù)量大,而執(zhí)行窗口一般又要求比較小。所以必須要通過(guò)多種方式來(lái)加速Job的執(zhí)行。一般我們有四種方式來(lái)實(shí)現(xiàn):
- 在單個(gè)step中多線程執(zhí)行任務(wù)
- 并行執(zhí)行不同的Step
- 并行執(zhí)行同一個(gè)Step
- 遠(yuǎn)程執(zhí)行Chunk任務(wù)
在單個(gè)step多線程執(zhí)行任務(wù)可以借助于taskExecutor來(lái)實(shí)現(xiàn)。這種情況適合于reader、writer是線程安全的并且是無(wú)狀態(tài)的場(chǎng)景。我們還可以設(shè)置線程數(shù)量。
public Step step() { return stepBuilders.get("step") .tasklet(tasklet) .throttleLimit(20) .build(); }
上述示例中的tasklet需要實(shí)現(xiàn)TaskExecutor,Spring Batch提供了一個(gè)簡(jiǎn)單的多線程TaskExecutor供我們使用:SimpleAsyncTaskExecutor。
并行執(zhí)行不同的Step在Spring batch中很容易實(shí)現(xiàn),以下是一個(gè)示例:
public Job job() { return stepBuilders.get("parallelSteps") .start(step1) .split(asyncTaskExecutor).add(flow1, flow2) .next(step3) .build(); }
在這個(gè)示例中我們先執(zhí)行step1,然后并行執(zhí)行flow1和flow2,最后再執(zhí)行step3。
Spring batch提供了PartitionStep來(lái)實(shí)現(xiàn)對(duì)同一個(gè)step在多個(gè)進(jìn)程中實(shí)現(xiàn)并行處理。通過(guò)PartitonStep再配合PartitionHandler可以將一個(gè)step擴(kuò)展到多個(gè)Slave上實(shí)現(xiàn)并行運(yùn)行。
遠(yuǎn)程執(zhí)行Chunk任務(wù)則是將某個(gè)Step的processer操作分割到多個(gè)進(jìn)程中,多個(gè)進(jìn)程通過(guò)一些中間件進(jìn)行通訊(比如采用消息的方式)。這種方式適合于Processer是瓶頸而Reader和Writer不是瓶頸的場(chǎng)景。
結(jié)語(yǔ)
Spring Batch對(duì)批處理場(chǎng)景進(jìn)行了合理的抽象,封裝了大量的實(shí)用功能,使用它來(lái)開發(fā)批處理應(yīng)用可以達(dá)到事半功倍的效果。在使用的過(guò)程中我們?nèi)孕枰獔?jiān)持總結(jié)一些最佳實(shí)踐,從而能夠交付高質(zhì)量的可維護(hù)的批處理應(yīng)用,滿足企業(yè)級(jí)應(yīng)用的苛刻要求。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Java封裝數(shù)組之動(dòng)態(tài)數(shù)組實(shí)現(xiàn)方法詳解
這篇文章主要介紹了Java封裝數(shù)組之動(dòng)態(tài)數(shù)組實(shí)現(xiàn)方法,結(jié)合實(shí)例形式詳細(xì)分析了java動(dòng)態(tài)數(shù)組的實(shí)現(xiàn)原理、操作步驟與相關(guān)注意事項(xiàng),需要的朋友可以參考下2020-03-03SpringBoot項(xiàng)目打包為JAR文件的實(shí)現(xiàn)
本文主要介紹了SpringBoot項(xiàng)目打包為JAR文件的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-09-09java.net.ConnectException: Connection refused問題解決辦法
這篇文章主要介紹了java.net.ConnectException: Connection refused問題解決辦法的相關(guān)資料,需要的朋友可以參考下2016-12-12Spring Boot集成springfox-swagger2構(gòu)建restful API的方法教程
這篇文章主要給大家介紹了關(guān)于Spring Boot集成springfox-swagger2構(gòu)建restful API的相關(guān)資料,文中介紹的非常詳細(xì),對(duì)大家具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面跟著小編一起來(lái)學(xué)習(xí)學(xué)習(xí)吧。2017-06-06@CacheEvict + redis實(shí)現(xiàn)批量刪除緩存
這篇文章主要介紹了@CacheEvict + redis實(shí)現(xiàn)批量刪除緩存方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10基于SpringBoot實(shí)現(xiàn)驗(yàn)證碼功能的代碼及思路
SpringBoot技術(shù)是目前市面上從事JavaEE企業(yè)級(jí)開發(fā)過(guò)程中使用量最大的技術(shù),下面這篇文章主要給大家介紹了如何基于SpringBoot實(shí)現(xiàn)驗(yàn)證碼功能的相關(guān)資料,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-07-07