Spring?Boot?+?Spring?Batch?實現(xiàn)批處理任務的詳細教程
前言
概念詞就不多說了,我簡單地介紹下 , spring batch 是一個 方便使用的 較健全的 批處理 框架。
為什么說是方便使用的,因為這是 基于spring的一個框架,接入簡單、易理解、流程分明。
為什么說是較健全的, 因為它提供了往常我們在對大批量數(shù)據(jù)進行處理時需要考慮到的 日志跟蹤、事務粒度調(diào)配、可控執(zhí)行、失敗機制、重試機制、數(shù)據(jù)讀寫等。
正文
那么回到文章,我們該篇文章將會帶來給大家的是什么?(結(jié)合實例講解那是當然的)
從實現(xiàn)的業(yè)務場景來說,有以下兩個:
- 從 csv文件 讀取數(shù)據(jù),進行業(yè)務處理再存儲
- 從 數(shù)據(jù)庫 讀取數(shù)據(jù),進行業(yè)務處理再存儲
也就是平時經(jīng)常遇到的數(shù)據(jù)清理或者數(shù)據(jù)過濾,又或者是數(shù)據(jù)遷移備份等等。大批量的數(shù)據(jù),自己實現(xiàn)分批處理需要考慮的東西太多了,又不放心,那么使用 Spring Batch 框架 是一個很好的選擇。
首先,在進入實例教程前,我們看看這次的實例里,我們使用springboot 整合spring batch 框架,要編碼的東西有什么?
通過一張簡單的圖來了解:
可能大家看到這個圖,是不是多多少少想起來定時任務框架?確實有那么點像,但是我必須在這告訴大家,這是一個批處理框架,不是一個schuedling 框架。但是前面提到它提供了可執(zhí)行控制,也就是說,啥時候執(zhí)行是可控的,那么顯然就是自己可以進行擴展結(jié)合定時任務框架,實現(xiàn)你心中所想。
ok,回到主題,相信大家能從圖中簡單明了地看到我們這次實例,需要實現(xiàn)的東西有什么了。所以我就不在對各個小組件進行大批量文字的描述了。
那么我們事不宜遲,開始我們的實例教程。
首先準備一個數(shù)據(jù)庫,里面建一張簡單的表,用于實例數(shù)據(jù)的寫入存儲或者說是讀取等等。
bloginfo表
相關建表sql語句:
CREATE TABLE `bloginfo` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵', `blogAuthor` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客作者標識', `blogUrl` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客鏈接', `blogTitle` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客標題', `blogItem` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客欄目', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 89031 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
pom文件里的核心依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- spring batch --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <!-- hibernate validator --> <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator</artifactId> <version>6.0.7.Final</version> </dependency> <!-- mybatis --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.0.0</version> </dependency> <!-- mysql --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <!-- druid數(shù)據(jù)源驅(qū)動 1.1.10解決springboot從1.0——2.0版本問題--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.18</version> </dependency>
yml文件:
Spring Boot 基礎就不介紹了,推薦看這個實戰(zhàn)項目:
https://github.com/javastacks/spring-boot-best-practice
spring: batch: job: #設置為 false -需要jobLaucher.run執(zhí)行 enabled: false initialize-schema: always # table-prefix: my-batch datasource: druid: username: root password: root url: jdbc:mysql://localhost:3306/hellodemo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull driver-class-name: com.mysql.cj.jdbc.Driver initialSize: 5 minIdle: 5 maxActive: 20 maxWait: 60000 timeBetweenEvictionRunsMillis: 60000 minEvictableIdleTimeMillis: 300000 validationQuery: SELECT 1 FROM DUAL testWhileIdle: true testOnBorrow: false testOnReturn: false poolPreparedStatements: true maxPoolPreparedStatementPerConnectionSize: 20 useGlobalDataSourceStat: true connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 server: port: 8665
ps:這里我們用到了druid數(shù)據(jù)庫連接池,其實有個小坑,后面文章會講到。
因為我們這次的實例最終數(shù)據(jù)處理完之后,是寫入數(shù)據(jù)庫存儲(當然你也可以輸出到文件等等)。
所以我們前面也建了一張表,pom文件里面我們也整合的mybatis,那么我們在整合spring batch 主要編碼前,我們先把這些關于數(shù)據(jù)庫打通用到的簡單過一下。
pojo 層
BlogInfo.java :
/** * @Author : JCccc * @Description : **/ public class BlogInfo { private Integer id; private String blogAuthor; private String blogUrl; private String blogTitle; private String blogItem; @Override public String toString() { return "BlogInfo{" + "id=" + id + ", blogAuthor='" + blogAuthor + '\'' + ", blogUrl='" + blogUrl + '\'' + ", blogTitle='" + blogTitle + '\'' + ", blogItem='" + blogItem + '\'' + '}'; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getBlogAuthor() { return blogAuthor; } public void setBlogAuthor(String blogAuthor) { this.blogAuthor = blogAuthor; } public String getBlogUrl() { return blogUrl; } public void setBlogUrl(String blogUrl) { this.blogUrl = blogUrl; } public String getBlogTitle() { return blogTitle; } public void setBlogTitle(String blogTitle) { this.blogTitle = blogTitle; } public String getBlogItem() { return blogItem; } public void setBlogItem(String blogItem) { this.blogItem = blogItem; } }
mapper層
BlogMapper.java :
ps:可以看到這個實例我用的是注解的方式,哈哈為了省事,而且我還不寫servcie層和impl層,也是為了省事,因為該篇文章重點不在這些,所以這些不好的大家不要學。
import com.example.batchdemo.pojo.BlogInfo; import org.apache.ibatis.annotations.*; import java.util.List; import java.util.Map; /** * @Author : JCccc * @Description : **/ @Mapper public interface BlogMapper { @Insert("INSERT INTO bloginfo ( blogAuthor, blogUrl, blogTitle, blogItem ) VALUES ( #{blogAuthor}, #{blogUrl},#{blogTitle},#{blogItem}) ") @Options(useGeneratedKeys = true, keyProperty = "id") int insert(BlogInfo bloginfo); @Select("select blogAuthor, blogUrl, blogTitle, blogItem from bloginfo where blogAuthor < #{authorId}") List<BlogInfo> queryInfoById(Map<String , Integer> map); }
接下來 ,重頭戲,我們開始對前邊那張圖里涉及到的各個小組件進行編碼。
首先創(chuàng)建一個 配置類, MyBatchConfig.java
:
從我起名來看,可以知道這基本就是咱們整合spring batch 涉及到的一些配置組件都會寫在這里了。
首先我們按照咱們上面的圖來看,里面包含內(nèi)容有:
JobRepository job的注冊/存儲器 JobLauncher job的執(zhí)行器 Job job任務,包含一個或多個Step Step 包含(ItemReader、ItemProcessor和ItemWriter) ItemReader 數(shù)據(jù)讀取器 ItemProcessor 數(shù)據(jù)處理器 ItemWriter 數(shù)據(jù)輸出器
首先,在MyBatchConfig類前加入注解:
@Configuration
用于告訴spring,咱們這個類是一個自定義配置類,里面很多bean都需要加載到spring容器里面
@EnableBatchProcessing
開啟批處理支持
然后開始往MyBatchConfig類里,編寫各個小組件。
JobRepository
寫在MyBatchConfig類里
/** * JobRepository定義:Job的注冊容器以及和數(shù)據(jù)庫打交道(事務管理等) * @param dataSource * @param transactionManager * @return * @throws Exception */ @Bean public JobRepository myJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{ JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDatabaseType("mysql"); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDataSource(dataSource); return jobRepositoryFactoryBean.getObject(); }
JobLauncher
寫在MyBatchConfig類里
/** * jobLauncher定義:job的啟動器,綁定相關的jobRepository * @param dataSource * @param transactionManager * @return * @throws Exception */ @Bean public SimpleJobLauncher myJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{ SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); // 設置jobRepository jobLauncher.setJobRepository(myJobRepository(dataSource, transactionManager)); return jobLauncher; }
Job
寫在MyBatchConfig類里
/** * 定義job * @param jobs * @param myStep * @return */ @Bean public Job myJob(JobBuilderFactory jobs, Step myStep){ return jobs.get("myJob") .incrementer(new RunIdIncrementer()) .flow(myStep) .end() .listener(myJobListener()) .build(); }
對于Job的運行,是可以配置監(jiān)聽器的
JobListener
寫在MyBatchConfig類里
/** * 注冊job監(jiān)聽器 * @return */ @Bean public MyJobListener myJobListener(){ return new MyJobListener(); }
這是一個我們自己自定義的監(jiān)聽器,所以是單獨創(chuàng)建的,MyJobListener.java
:
/** * @Author : JCccc * @Description :監(jiān)聽Job執(zhí)行情況,實現(xiàn)JobExecutorListener,且在batch配置類里,Job的Bean上綁定該監(jiān)聽器 **/ public class MyJobListener implements JobExecutionListener { private Logger logger = LoggerFactory.getLogger(MyJobListener.class); @Override public void beforeJob(JobExecution jobExecution) { logger.info("job 開始, id={}",jobExecution.getJobId()); } @Override public void afterJob(JobExecution jobExecution) { logger.info("job 結(jié)束, id={}",jobExecution.getJobId()); } }
Step(ItemReader ItemProcessor ItemWriter)
step里面包含數(shù)據(jù)讀取器,數(shù)據(jù)處理器,數(shù)據(jù)輸出器三個小組件的的實現(xiàn)。
我們也是一個個拆解來進行編寫。
文章前邊說到,該篇實現(xiàn)的場景包含兩種,一種是從csv文件讀入大量數(shù)據(jù)進行處理,另一種是從數(shù)據(jù)庫表讀入大量數(shù)據(jù)進行處理。
從CSV文件讀取數(shù)據(jù)ItemReader
寫在MyBatchConfig類里
/** * ItemReader定義:讀取文件數(shù)據(jù)+entirty實體類映射 * @return */ @Bean public ItemReader<BlogInfo> reader(){ // 使用FlatFileItemReader去讀cvs文件,一行即一條數(shù)據(jù) FlatFileItemReader<BlogInfo> reader = new FlatFileItemReader<>(); // 設置文件處在路徑 reader.setResource(new ClassPathResource("static/bloginfo.csv")); // entity與csv數(shù)據(jù)做映射 reader.setLineMapper(new DefaultLineMapper<BlogInfo>() { { setLineTokenizer(new DelimitedLineTokenizer() { { setNames(new String[]{"blogAuthor","blogUrl","blogTitle","blogItem"}); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<BlogInfo>() { { setTargetType(BlogInfo.class); } }); } }); return reader; }
簡單代碼解析:
對于數(shù)據(jù)讀取器 ItemReader ,我們給它安排了一個讀取監(jiān)聽器,創(chuàng)建 MyReadListener.java
:
/** * @Author : JCccc * @Description : **/ public class MyReadListener implements ItemReadListener<BlogInfo> { private Logger logger = LoggerFactory.getLogger(MyReadListener.class); @Override public void beforeRead() { } @Override public void afterRead(BlogInfo item) { } @Override public void onReadError(Exception ex) { try { logger.info(format("%s%n", ex.getMessage())); } catch (Exception e) { e.printStackTrace(); } } }
ItemProcessor
寫在MyBatchConfig類里
/** * 注冊ItemProcessor: 處理數(shù)據(jù)+校驗數(shù)據(jù) * @return */ @Bean public ItemProcessor<BlogInfo, BlogInfo> processor(){ MyItemProcessor myItemProcessor = new MyItemProcessor(); // 設置校驗器 myItemProcessor.setValidator(myBeanValidator()); return myItemProcessor; }
數(shù)據(jù)處理器,是我們自定義的,里面主要是包含我們對數(shù)據(jù)處理的業(yè)務邏輯,并且我們設置了一些數(shù)據(jù)校驗器,我們這里使用 JSR-303的Validator來作為校驗器。
校驗器
寫在MyBatchConfig類里
/** * 注冊校驗器 * @return */ @Bean public MyBeanValidator myBeanValidator(){ return new MyBeanValidator<BlogInfo>(); }
創(chuàng)建MyItemProcessor.java
:
ps:里面我的數(shù)據(jù)處理邏輯是,獲取出讀取數(shù)據(jù)里面的每條數(shù)據(jù)的blogItem字段,如果是springboot,那就對title字段值進行替換。
其實也就是模擬一個簡單地數(shù)據(jù)處理場景。
import com.example.batchdemo.pojo.BlogInfo; import org.springframework.batch.item.validator.ValidatingItemProcessor; import org.springframework.batch.item.validator.ValidationException; /** * @Author : JCccc * @Description : **/ public class MyItemProcessor extends ValidatingItemProcessor<BlogInfo> { @Override public BlogInfo process(BlogInfo item) throws ValidationException { /** * 需要執(zhí)行super.process(item)才會調(diào)用自定義校驗器 */ super.process(item); /** * 對數(shù)據(jù)進行簡單的處理 */ if (item.getBlogItem().equals("springboot")) { item.setBlogTitle("springboot 系列還請看看我Jc"); } else { item.setBlogTitle("未知系列"); } return item; } }
創(chuàng)建MyBeanValidator.java:
import org.springframework.batch.item.validator.ValidationException; import org.springframework.batch.item.validator.Validator; import org.springframework.beans.factory.InitializingBean; import javax.validation.ConstraintViolation; import javax.validation.Validation; import javax.validation.ValidatorFactory; import java.util.Set; /** * @Author : JCccc * @Description : **/ public class MyBeanValidator<T> implements Validator<T>, InitializingBean { private javax.validation.Validator validator; @Override public void validate(T value) throws ValidationException { /** * 使用Validator的validate方法校驗數(shù)據(jù) */ Set<ConstraintViolation<T>> constraintViolations = validator.validate(value); if (constraintViolations.size() > 0) { StringBuilder message = new StringBuilder(); for (ConstraintViolation<T> constraintViolation : constraintViolations) { message.append(constraintViolation.getMessage() + "\n"); } throw new ValidationException(message.toString()); } } /** * 使用JSR-303的Validator來校驗我們的數(shù)據(jù),在此進行JSR-303的Validator的初始化 * @throws Exception */ @Override public void afterPropertiesSet() throws Exception { ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); validator = validatorFactory.usingContext().getValidator(); } }
ps:其實該篇文章沒有使用這個數(shù)據(jù)校驗器,大家想使用的話,可以在實體類上添加一些校驗器的注解@NotNull @Max @Email等等。我偏向于直接在處理器里面進行處理,想把關于數(shù)據(jù)處理的代碼都寫在一塊。
ItemWriter
寫在MyBatchConfig類里
/** * ItemWriter定義:指定datasource,設置批量插入sql語句,寫入數(shù)據(jù)庫 * @param dataSource * @return */ @Bean public ItemWriter<BlogInfo> writer(DataSource dataSource){ // 使用jdbcBcatchItemWrite寫數(shù)據(jù)到數(shù)據(jù)庫中 JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>(); // 設置有參數(shù)的sql語句 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>()); String sql = "insert into bloginfo "+" (blogAuthor,blogUrl,blogTitle,blogItem) " +" values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)"; writer.setSql(sql); writer.setDataSource(dataSource); return writer; }
簡單代碼解析:
同樣 對于數(shù)據(jù)讀取器 ItemWriter ,我們給它也安排了一個輸出監(jiān)聽器,創(chuàng)建 MyWriteListener.java
:
import com.example.batchdemo.pojo.BlogInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.ItemWriteListener; import java.util.List; import static java.lang.String.format; /** * @Author : JCccc * @Description : **/ public class MyWriteListener implements ItemWriteListener<BlogInfo> { private Logger logger = LoggerFactory.getLogger(MyWriteListener.class); @Override public void beforeWrite(List<? extends BlogInfo> items) { } @Override public void afterWrite(List<? extends BlogInfo> items) { } @Override public void onWriteError(Exception exception, List<? extends BlogInfo> items) { try { logger.info(format("%s%n", exception.getMessage())); for (BlogInfo message : items) { logger.info(format("Failed writing BlogInfo : %s", message.toString())); } } catch (Exception e) { e.printStackTrace(); } } }
ItemReader
、ItemProcessor
、ItemWriter
,這三個小組件到這里,我們都實現(xiàn)了,那么接下來就是把這三個小組件跟我們的step去綁定起來。
寫在MyBatchConfig類里
/** * step定義: * 包括 * ItemReader 讀取 * ItemProcessor 處理 * ItemWriter 輸出 * @param stepBuilderFactory * @param reader * @param writer * @param processor * @return */ @Bean public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<BlogInfo> reader, ItemWriter<BlogInfo> writer, ItemProcessor<BlogInfo, BlogInfo> processor){ return stepBuilderFactory .get("myStep") .<BlogInfo, BlogInfo>chunk(65000) // Chunk的機制(即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進行寫入操作) .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2) .listener(new MyReadListener()) .processor(processor) .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2) .listener(new MyWriteListener()) .build(); }
這個Step,稍作講解。
前邊提到了,spring batch框架,提供了事務的控制,重啟,檢測跳過等等機制。
那么,這些東西的實現(xiàn),很多都在于這個step環(huán)節(jié)的設置。
首先看到我們代碼出現(xiàn)的第一個設置,chunk( 6500 )
,Chunk的機制(即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進行寫入操作。
沒錯,對于整個step環(huán)節(jié),就是數(shù)據(jù)的讀取,處理最后到輸出。
這個chunk機制里,我們傳入的 6500,也就是是告訴它,讀取處理數(shù)據(jù),累計達到 6500條進行一次批次處理,去執(zhí)行寫入操作。
這個傳值,是根據(jù)具體業(yè)務而定,可以是500條一次,1000條一次,也可以是20條一次,50條一次。
通過一張簡單的小圖來幫助理解:
在我們大量數(shù)據(jù)處理,不管是讀取或者說是寫入,都肯定會涉及到一些未知或者已知因素導致某條數(shù)據(jù)失敗了。
那么如果說咱們啥也不設置,失敗一條數(shù)據(jù),那么我們就當作整個失敗了?。顯然這個太不人性,所以spring batch 提供了 retry 和 skip 兩個設置(其實還有restart) ,通過這兩個設置來人性化地解決一些數(shù)據(jù)操作失敗場景。
retryLimit(3).retry(Exception.class)
沒錯,這個就是設置重試,當出現(xiàn)異常的時候,重試多少次。我們設置為3,也就是說當一條數(shù)據(jù)操作失敗,那我們會對這條數(shù)據(jù)進行重試3次,還是失敗就是 當做失敗了, 那么我們?nèi)绻信渲胹kip(推薦配置使用),那么這個數(shù)據(jù)失敗記錄就會留到給 skip 來處理。
skip(Exception.class).skipLimit(2)
skip,跳過,也就是說我們?nèi)绻O置3, 那么就是可以容忍 3條數(shù)據(jù)的失敗。只有達到失敗數(shù)據(jù)達到3次,我們才中斷這個step。
對于失敗的數(shù)據(jù),我們做了相關的監(jiān)聽器以及異常信息記錄,供與后續(xù)手動補救。
那么記下來我們開始去調(diào)用這個批處理job,我們通過接口去觸發(fā)這個批處理事件,新建一個Controller,TestController.java
:
/** * @Author : JCccc * @Description : **/ @RestController public class TestController { @Autowired SimpleJobLauncher jobLauncher; @Autowired Job myJob; @GetMapping("testJob") public void testJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException { // 后置參數(shù):使用JobParameters中綁定參數(shù) addLong addString 等方法 JobParameters jobParameters = new JobParametersBuilder().toJobParameters(); jobLauncher.run(myJob, jobParameters); } }
對了,我準備了一個csv文件 bloginfo.csv
,里面大概8萬多條數(shù)據(jù),用來進行批處理測試:
這個文件的路徑跟我們的數(shù)據(jù)讀取器里面讀取的路徑要一直,
目前我們數(shù)據(jù)庫是這個樣子,
接下來我們把我們的項目啟動起來,再看一眼數(shù)據(jù)庫,生成了一些batch用來跟蹤記錄job的一些數(shù)據(jù)表:
我們來調(diào)用一下testJob接口,
然后看下數(shù)據(jù)庫,可以看的數(shù)據(jù)全部都進行了相關的邏輯處理并插入到了數(shù)據(jù)庫:
到這里,我們對Springboot 整合 spring batch 其實已經(jīng)操作完畢了,也實現(xiàn)了從csv文件讀取數(shù)據(jù)處理存儲的業(yè)務場景。
從數(shù)據(jù)庫讀取數(shù)據(jù)
ps:前排提示使用druid有坑。后面會講到。
那么接下來實現(xiàn)場景,從數(shù)據(jù)庫表內(nèi)讀取數(shù)據(jù)進行處理輸出到新的表里面。
那么基于我們上邊的整合,我們已經(jīng)實現(xiàn)了
JobRepository job的注冊/存儲器 JobLauncher job的執(zhí)行器 Job job任務,包含一個或多個Step Step 包含(ItemReader、ItemProcessor和ItemWriter) ItemReader 數(shù)據(jù)讀取器 ItemProcessor 數(shù)據(jù)處理器 ItemWriter 數(shù)據(jù)輸出器 job 監(jiān)聽器 reader 監(jiān)聽器 writer 監(jiān)聽器 process 數(shù)據(jù)校驗器
那么對于我們新寫一個job完成 一個新的場景,我們需要全部重寫么?
顯然沒必要,當然完全新寫一套也是可以的。
那么該篇,對于一個新的也出場景,從csv文件讀取數(shù)據(jù)轉(zhuǎn)換到數(shù)據(jù)庫表讀取數(shù)據(jù),我們重新新建的有:
- 數(shù)據(jù)讀取器: 原先使用的是
FlatFileItemReader
,我們現(xiàn)在改為使用MyBatisCursorItemReader
- 數(shù)據(jù)處理器: 新的場景,業(yè)務為了好擴展,所以我們處理器最好也新建一個
- 數(shù)據(jù)輸出器: 新的場景,業(yè)務為了好擴展,所以我們數(shù)據(jù)輸出器最好也新建一個
- step的綁定設置: 新的場景,業(yè)務為了好擴展,所以我們step最好也新建一個
- Job: 當然是要重新寫一個了
其他我們照用原先的就行,JobRepository,JobLauncher以及各種監(jiān)聽器啥的,暫且不重新建了。
新建MyItemProcessorNew.java
:
import org.springframework.batch.item.validator.ValidatingItemProcessor; import org.springframework.batch.item.validator.ValidationException; /** * @Author : JCccc * @Description : **/ public class MyItemProcessorNew extends ValidatingItemProcessor<BlogInfo> { @Override public BlogInfo process(BlogInfo item) throws ValidationException { /** * 需要執(zhí)行super.process(item)才會調(diào)用自定義校驗器 */ super.process(item); /** * 對數(shù)據(jù)進行簡單的處理 */ Integer authorId= Integer.valueOf(item.getBlogAuthor()); if (authorId<20000) { item.setBlogTitle("這是都是小于20000的數(shù)據(jù)"); } else if (authorId>20000 && authorId<30000){ item.setBlogTitle("這是都是小于30000但是大于20000的數(shù)據(jù)"); }else { item.setBlogTitle("舊書不厭百回讀"); } return item; } }
然后其他重新定義的小組件,寫在MyBatchConfig類里:
/** * 定義job * @param jobs * @param stepNew * @return */ @Bean public Job myJobNew(JobBuilderFactory jobs, Step stepNew){ return jobs.get("myJobNew") .incrementer(new RunIdIncrementer()) .flow(stepNew) .end() .listener(myJobListener()) .build(); } @Bean public Step stepNew(StepBuilderFactory stepBuilderFactory, MyBatisCursorItemReader<BlogInfo> itemReaderNew, ItemWriter<BlogInfo> writerNew, ItemProcessor<BlogInfo, BlogInfo> processorNew){ return stepBuilderFactory .get("stepNew") .<BlogInfo, BlogInfo>chunk(65000) // Chunk的機制(即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進行寫入操作) .reader(itemReaderNew).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(10) .listener(new MyReadListener()) .processor(processorNew) .writer(writerNew).faultTolerant().skip(Exception.class).skipLimit(2) .listener(new MyWriteListener()) .build(); } @Bean public ItemProcessor<BlogInfo, BlogInfo> processorNew(){ MyItemProcessorNew csvItemProcessor = new MyItemProcessorNew(); // 設置校驗器 csvItemProcessor.setValidator(myBeanValidator()); return csvItemProcessor; } @Autowired private SqlSessionFactory sqlSessionFactory; @Bean @StepScope //Spring Batch提供了一個特殊的bean scope類(StepScope:作為一個自定義的Spring bean scope)。這個step scope的作用是連接batches的各個steps。這個機制允許配置在Spring的beans當steps開始時才實例化并且允許你為這個step指定配置和參數(shù)。 public MyBatisCursorItemReader<BlogInfo> itemReaderNew(@Value("#{jobParameters[authorId]}") String authorId) { System.out.println("開始查詢數(shù)據(jù)庫"); MyBatisCursorItemReader<BlogInfo> reader = new MyBatisCursorItemReader<>(); reader.setQueryId("com.example.batchdemo.mapper.BlogMapper.queryInfoById"); reader.setSqlSessionFactory(sqlSessionFactory); Map<String , Object> map = new HashMap<>(); map.put("authorId" , Integer.valueOf(authorId)); reader.setParameterValues(map); return reader; } /** * ItemWriter定義:指定datasource,設置批量插入sql語句,寫入數(shù)據(jù)庫 * @param dataSource * @return */ @Bean public ItemWriter<BlogInfo> writerNew(DataSource dataSource){ // 使用jdbcBcatchItemWrite寫數(shù)據(jù)到數(shù)據(jù)庫中 JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>(); // 設置有參數(shù)的sql語句 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>()); String sql = "insert into bloginfonew "+" (blogAuthor,blogUrl,blogTitle,blogItem) " +" values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)"; writer.setSql(sql); writer.setDataSource(dataSource); return writer; }
代碼需要注意的點
數(shù)據(jù)讀取器 MyBatisCursorItemReader
對應的mapper方法:
數(shù)據(jù)處理器 MyItemProcessorNew:
數(shù)據(jù)輸出器,新插入到別的數(shù)據(jù)庫表去,特意這樣為了測試:
當然我們的數(shù)據(jù)庫為了測試這個場景,也是新建了一張表,bloginfonew 表。
接下來,我們新寫一個接口來執(zhí)行新的這個job:
@Autowired SimpleJobLauncher jobLauncher; @Autowired Job myJobNew; @GetMapping("testJobNew") public void testJobNew(@RequestParam("authorId") String authorId) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException { JobParameters jobParametersNew = new JobParametersBuilder().addLong("timeNew", System.currentTimeMillis()) .addString("authorId",authorId) .toJobParameters(); jobLauncher.run(myJobNew,jobParametersNew); }
ok,我們來調(diào)用一些這個接口:
看下控制臺:
沒錯,這就是失敗的,原因是因為跟druid有關,報了一個數(shù)據(jù)庫功能不支持。這是在數(shù)據(jù)讀取的時候報的錯。
我初步測試認為是MyBatisCursorItemReader
,druid 數(shù)據(jù)庫連接池不支持。
那么,我們只需要:
注釋掉druid連接池 jar依賴
yml里替換連接池配置
其實我們不配置其他連接池,springboot 2.X 版本已經(jīng)為我們整合了默認的連接池 HikariCP 。
在Springboot2.X版本,數(shù)據(jù)庫的連接池官方推薦使用HikariCP
如果不是為了druid的那些后臺監(jiān)控數(shù)據(jù),sql分析等等,完全是優(yōu)先使用HikariCP的。
官方的原話
We preferHikariCPfor its performance and concurrency. If HikariCP is available, we always choose it.
翻譯:
我們更喜歡hikaricpf的性能和并發(fā)性。如果有HikariCP,我們總是選擇它。
所以我們就啥連接池也不配了,使用默認的HikariCP 連接池。
推薦一個開源免費的 Spring Boot 實戰(zhàn)項目:
https://github.com/javastacks/spring-boot-best-practice
當然你想配,也是可以的:
所以我們剔除掉druid鏈接池后,我們再來調(diào)用一下新接口:
可以看到,從數(shù)據(jù)庫獲取數(shù)據(jù)并進行批次處理寫入job是成功的:
新的表里面插入的數(shù)據(jù)都進行了自己寫的邏輯處理:
好了,springboot 整合 spring batch 批處理框架, 就到此吧。
到此這篇關于Spring Boot + Spring Batch 實現(xiàn)批處理任務的文章就介紹到這了,更多相關Spring Boot Spring Batch 批處理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java Poi 在Excel中輸出特殊符號的實現(xiàn)方法
這篇文章主要介紹了Java Poi 在Excel中輸出特殊符號的實現(xiàn)方法,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07SSH框架網(wǎng)上商城項目第4戰(zhàn)之EasyUI菜單的實現(xiàn)
SSH框架網(wǎng)上商城項目第4戰(zhàn)之EasyUI菜單的實現(xiàn),本文主要使用EasyUI技術簡單實現(xiàn)后臺菜單,感興趣的小伙伴們可以參考一下2016-05-05Java實現(xiàn)解析zip壓縮包并獲取文件內(nèi)容
這篇文章主要為大家詳細介紹了如何利用Java語言實現(xiàn)頁面上傳一個源碼壓縮包,后端將壓縮包解壓,并獲取每個文件中的內(nèi)容,感興趣的可以動手嘗試一下2022-07-07Java 實戰(zhàn)項目錘煉之網(wǎng)上花店商城的實現(xiàn)流程
讀萬卷書不如行萬里路,只學書上的理論是遠遠不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+jsp+servlet+mysql+ajax實現(xiàn)一個網(wǎng)上花店商城系統(tǒng),大家可以在過程中查缺補漏,提升水平2021-11-11JavaWeb?Servlet實現(xiàn)文件上傳與下載功能實例
因自己負責的項目中需要實現(xiàn)文件上傳,所以下面下面這篇文章主要給大家介紹了關于JavaWeb?Servlet實現(xiàn)文件上傳與下載功能的相關資料,需要的朋友可以參考下2022-04-04