欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring?Boot?+?Spring?Batch?實(shí)現(xiàn)批處理任務(wù)的詳細(xì)教程

 更新時(shí)間:2023年08月24日 09:21:32   作者:Java技術(shù)棧  
這篇文章主要介紹了Spring?Boot+Spring?Batch實(shí)現(xiàn)批處理任務(wù),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

前言

概念詞就不多說(shuō)了,我簡(jiǎn)單地介紹下 , spring batch 是一個(gè) 方便使用的 較健全的 批處理 框架。

為什么說(shuō)是方便使用的,因?yàn)檫@是 基于spring的一個(gè)框架,接入簡(jiǎn)單、易理解、流程分明。

為什么說(shuō)是較健全的, 因?yàn)樗峁┝送N覀冊(cè)趯?duì)大批量數(shù)據(jù)進(jìn)行處理時(shí)需要考慮到的 日志跟蹤、事務(wù)粒度調(diào)配、可控執(zhí)行、失敗機(jī)制、重試機(jī)制、數(shù)據(jù)讀寫(xiě)等。

正文

那么回到文章,我們?cè)撈恼聦?huì)帶來(lái)給大家的是什么?(結(jié)合實(shí)例講解那是當(dāng)然的)

從實(shí)現(xiàn)的業(yè)務(wù)場(chǎng)景來(lái)說(shuō),有以下兩個(gè):

  • 從 csv文件 讀取數(shù)據(jù),進(jìn)行業(yè)務(wù)處理再存儲(chǔ)
  • 從 數(shù)據(jù)庫(kù) 讀取數(shù)據(jù),進(jìn)行業(yè)務(wù)處理再存儲(chǔ)

也就是平時(shí)經(jīng)常遇到的數(shù)據(jù)清理或者數(shù)據(jù)過(guò)濾,又或者是數(shù)據(jù)遷移備份等等。大批量的數(shù)據(jù),自己實(shí)現(xiàn)分批處理需要考慮的東西太多了,又不放心,那么使用 Spring Batch 框架 是一個(gè)很好的選擇。

首先,在進(jìn)入實(shí)例教程前,我們看看這次的實(shí)例里,我們使用springboot 整合spring batch 框架,要編碼的東西有什么?

通過(guò)一張簡(jiǎn)單的圖來(lái)了解:

可能大家看到這個(gè)圖,是不是多多少少想起來(lái)定時(shí)任務(wù)框架?確實(shí)有那么點(diǎn)像,但是我必須在這告訴大家,這是一個(gè)批處理框架,不是一個(gè)schuedling 框架。但是前面提到它提供了可執(zhí)行控制,也就是說(shuō),啥時(shí)候執(zhí)行是可控的,那么顯然就是自己可以進(jìn)行擴(kuò)展結(jié)合定時(shí)任務(wù)框架,實(shí)現(xiàn)你心中所想。

ok,回到主題,相信大家能從圖中簡(jiǎn)單明了地看到我們這次實(shí)例,需要實(shí)現(xiàn)的東西有什么了。所以我就不在對(duì)各個(gè)小組件進(jìn)行大批量文字的描述了。

那么我們事不宜遲,開(kāi)始我們的實(shí)例教程。

首先準(zhǔn)備一個(gè)數(shù)據(jù)庫(kù),里面建一張簡(jiǎn)單的表,用于實(shí)例數(shù)據(jù)的寫(xiě)入存儲(chǔ)或者說(shuō)是讀取等等。

bloginfo表

相關(guān)建表sql語(yǔ)句:

CREATE TABLE `bloginfo`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `blogAuthor` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客作者標(biāo)識(shí)',
  `blogUrl` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客鏈接',
  `blogTitle` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客標(biāo)題',
  `blogItem` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客欄目',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 89031 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

pom文件里的核心依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<!--  spring batch -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- hibernate validator -->
<dependency>
    <groupId>org.hibernate</groupId>
    <artifactId>hibernate-validator</artifactId>
    <version>6.0.7.Final</version>
</dependency>
<!--  mybatis -->
<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>2.0.0</version>
</dependency>
<!--  mysql -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <scope>runtime</scope>
</dependency>
<!-- druid數(shù)據(jù)源驅(qū)動(dòng) 1.1.10解決springboot從1.0——2.0版本問(wèn)題-->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>1.1.18</version>
</dependency>

yml文件:

Spring Boot 基礎(chǔ)就不介紹了,推薦看這個(gè)實(shí)戰(zhàn)項(xiàng)目:

https://github.com/javastacks/spring-boot-best-practice

spring:
  batch:
    job:
#設(shè)置為 false -需要jobLaucher.run執(zhí)行
      enabled: false
    initialize-schema: always
#    table-prefix: my-batch
  datasource:
    druid:
      username: root
      password: root
      url: jdbc:mysql://localhost:3306/hellodemo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
      driver-class-name: com.mysql.cj.jdbc.Driver
      initialSize: 5
      minIdle: 5
      maxActive: 20
      maxWait: 60000
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 300000
      validationQuery: SELECT 1 FROM DUAL
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      poolPreparedStatements: true
      maxPoolPreparedStatementPerConnectionSize: 20
      useGlobalDataSourceStat: true
      connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
server:
  port: 8665

ps:這里我們用到了druid數(shù)據(jù)庫(kù)連接池,其實(shí)有個(gè)小坑,后面文章會(huì)講到。

因?yàn)槲覀冞@次的實(shí)例最終數(shù)據(jù)處理完之后,是寫(xiě)入數(shù)據(jù)庫(kù)存儲(chǔ)(當(dāng)然你也可以輸出到文件等等)。

所以我們前面也建了一張表,pom文件里面我們也整合的mybatis,那么我們?cè)谡蟬pring batch 主要編碼前,我們先把這些關(guān)于數(shù)據(jù)庫(kù)打通用到的簡(jiǎn)單過(guò)一下。

pojo 層

BlogInfo.java :

/**
 * @Author : JCccc
 * @Description :
 **/
public class BlogInfo {
    private Integer id;
    private String blogAuthor;
    private String blogUrl;
    private String blogTitle;
    private String blogItem;
    @Override
    public String toString() {
        return "BlogInfo{" +
                "id=" + id +
                ", blogAuthor='" + blogAuthor + '\'' +
                ", blogUrl='" + blogUrl + '\'' +
                ", blogTitle='" + blogTitle + '\'' +
                ", blogItem='" + blogItem + '\'' +
                '}';
    }
    public Integer getId() {
        return id;
    }
    public void setId(Integer id) {
        this.id = id;
    }
    public String getBlogAuthor() {
        return blogAuthor;
    }
    public void setBlogAuthor(String blogAuthor) {
        this.blogAuthor = blogAuthor;
    }
    public String getBlogUrl() {
        return blogUrl;
    }
    public void setBlogUrl(String blogUrl) {
        this.blogUrl = blogUrl;
    }
    public String getBlogTitle() {
        return blogTitle;
    }
    public void setBlogTitle(String blogTitle) {
        this.blogTitle = blogTitle;
    }
    public String getBlogItem() {
        return blogItem;
    }
    public void setBlogItem(String blogItem) {
        this.blogItem = blogItem;
    }
}

mapper層

BlogMapper.java :

ps:可以看到這個(gè)實(shí)例我用的是注解的方式,哈哈為了省事,而且我還不寫(xiě)servcie層和impl層,也是為了省事,因?yàn)樵撈恼轮攸c(diǎn)不在這些,所以這些不好的大家不要學(xué)。

import com.example.batchdemo.pojo.BlogInfo;
import org.apache.ibatis.annotations.*;
import java.util.List;
import java.util.Map;
/**
 * @Author : JCccc
 * @Description :
 **/
@Mapper
public interface BlogMapper {
    @Insert("INSERT INTO bloginfo ( blogAuthor, blogUrl, blogTitle, blogItem )   VALUES ( #{blogAuthor}, #{blogUrl},#{blogTitle},#{blogItem}) ")
    @Options(useGeneratedKeys = true, keyProperty = "id")
    int insert(BlogInfo bloginfo);
    @Select("select blogAuthor, blogUrl, blogTitle, blogItem from bloginfo where blogAuthor < #{authorId}")
     List<BlogInfo> queryInfoById(Map<String , Integer> map);
}

接下來(lái) ,重頭戲,我們開(kāi)始對(duì)前邊那張圖里涉及到的各個(gè)小組件進(jìn)行編碼。

首先創(chuàng)建一個(gè) 配置類(lèi), MyBatchConfig.java

從我起名來(lái)看,可以知道這基本就是咱們整合spring batch 涉及到的一些配置組件都會(huì)寫(xiě)在這里了。

首先我們按照咱們上面的圖來(lái)看,里面包含內(nèi)容有:

JobRepository job的注冊(cè)/存儲(chǔ)器
JobLauncher job的執(zhí)行器
Job job任務(wù),包含一個(gè)或多個(gè)Step
Step 包含(ItemReader、ItemProcessor和ItemWriter)
ItemReader 數(shù)據(jù)讀取器
ItemProcessor 數(shù)據(jù)處理器
ItemWriter 數(shù)據(jù)輸出器

首先,在MyBatchConfig類(lèi)前加入注解:

@Configuration 用于告訴spring,咱們這個(gè)類(lèi)是一個(gè)自定義配置類(lèi),里面很多bean都需要加載到spring容器里面

@EnableBatchProcessing 開(kāi)啟批處理支持

然后開(kāi)始往MyBatchConfig類(lèi)里,編寫(xiě)各個(gè)小組件。

JobRepository

寫(xiě)在MyBatchConfig類(lèi)里

/**
 * JobRepository定義:Job的注冊(cè)容器以及和數(shù)據(jù)庫(kù)打交道(事務(wù)管理等)
 * @param dataSource
 * @param transactionManager
 * @return
 * @throws Exception
 */
@Bean
public JobRepository myJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setDatabaseType("mysql");
    jobRepositoryFactoryBean.setTransactionManager(transactionManager);
    jobRepositoryFactoryBean.setDataSource(dataSource);
    return jobRepositoryFactoryBean.getObject();
}

JobLauncher

寫(xiě)在MyBatchConfig類(lèi)里

/**
 * jobLauncher定義:job的啟動(dòng)器,綁定相關(guān)的jobRepository
 * @param dataSource
 * @param transactionManager
 * @return
 * @throws Exception
 */
@Bean
public SimpleJobLauncher myJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    // 設(shè)置jobRepository
    jobLauncher.setJobRepository(myJobRepository(dataSource, transactionManager));
    return jobLauncher;
}

Job

寫(xiě)在MyBatchConfig類(lèi)里

/**
 * 定義job
 * @param jobs
 * @param myStep
 * @return
 */
@Bean
public Job myJob(JobBuilderFactory jobs, Step myStep){
    return jobs.get("myJob")
            .incrementer(new RunIdIncrementer())
            .flow(myStep)
            .end()
            .listener(myJobListener())
            .build();
}

對(duì)于Job的運(yùn)行,是可以配置監(jiān)聽(tīng)器的

JobListener

寫(xiě)在MyBatchConfig類(lèi)里

/**
 * 注冊(cè)job監(jiān)聽(tīng)器
 * @return
 */
@Bean
public MyJobListener myJobListener(){
    return new MyJobListener();
}

這是一個(gè)我們自己自定義的監(jiān)聽(tīng)器,所以是單獨(dú)創(chuàng)建的,MyJobListener.java

/**
 * @Author : JCccc
 * @Description :監(jiān)聽(tīng)Job執(zhí)行情況,實(shí)現(xiàn)JobExecutorListener,且在batch配置類(lèi)里,Job的Bean上綁定該監(jiān)聽(tīng)器
 **/
public class MyJobListener implements JobExecutionListener {
    private Logger logger = LoggerFactory.getLogger(MyJobListener.class);
    @Override
    public void beforeJob(JobExecution jobExecution) {
        logger.info("job 開(kāi)始, id={}",jobExecution.getJobId());
    }
    @Override
    public void afterJob(JobExecution jobExecution) {
        logger.info("job 結(jié)束, id={}",jobExecution.getJobId());
    }
}

Step(ItemReader ItemProcessor ItemWriter)

step里面包含數(shù)據(jù)讀取器,數(shù)據(jù)處理器,數(shù)據(jù)輸出器三個(gè)小組件的的實(shí)現(xiàn)。

我們也是一個(gè)個(gè)拆解來(lái)進(jìn)行編寫(xiě)。

文章前邊說(shuō)到,該篇實(shí)現(xiàn)的場(chǎng)景包含兩種,一種是從csv文件讀入大量數(shù)據(jù)進(jìn)行處理,另一種是從數(shù)據(jù)庫(kù)表讀入大量數(shù)據(jù)進(jìn)行處理。

從CSV文件讀取數(shù)據(jù)ItemReader

寫(xiě)在MyBatchConfig類(lèi)里

/**
 * ItemReader定義:讀取文件數(shù)據(jù)+entirty實(shí)體類(lèi)映射
 * @return
 */
@Bean
public ItemReader<BlogInfo> reader(){
    // 使用FlatFileItemReader去讀cvs文件,一行即一條數(shù)據(jù)
    FlatFileItemReader<BlogInfo> reader = new FlatFileItemReader<>();
    // 設(shè)置文件處在路徑
    reader.setResource(new ClassPathResource("static/bloginfo.csv"));
    // entity與csv數(shù)據(jù)做映射
    reader.setLineMapper(new DefaultLineMapper<BlogInfo>() {
        {
            setLineTokenizer(new DelimitedLineTokenizer() {
                {
                    setNames(new String[]{"blogAuthor","blogUrl","blogTitle","blogItem"});
                }
            });
            setFieldSetMapper(new BeanWrapperFieldSetMapper<BlogInfo>() {
                {
                    setTargetType(BlogInfo.class);
                }
            });
        }
    });
    return reader;
}

簡(jiǎn)單代碼解析:

對(duì)于數(shù)據(jù)讀取器 ItemReader ,我們給它安排了一個(gè)讀取監(jiān)聽(tīng)器,創(chuàng)建 MyReadListener.java

/**
 * @Author : JCccc
 * @Description :
 **/
public class MyReadListener implements ItemReadListener<BlogInfo> {
    private Logger logger = LoggerFactory.getLogger(MyReadListener.class);
    @Override
    public void beforeRead() {
    }
    @Override
    public void afterRead(BlogInfo item) {
    }
    @Override
    public void onReadError(Exception ex) {
        try {
            logger.info(format("%s%n", ex.getMessage()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

ItemProcessor

寫(xiě)在MyBatchConfig類(lèi)里

/**
 * 注冊(cè)ItemProcessor: 處理數(shù)據(jù)+校驗(yàn)數(shù)據(jù)
 * @return
 */
@Bean
public ItemProcessor<BlogInfo, BlogInfo> processor(){
    MyItemProcessor myItemProcessor = new MyItemProcessor();
    // 設(shè)置校驗(yàn)器
    myItemProcessor.setValidator(myBeanValidator());
    return myItemProcessor;
}

數(shù)據(jù)處理器,是我們自定義的,里面主要是包含我們對(duì)數(shù)據(jù)處理的業(yè)務(wù)邏輯,并且我們?cè)O(shè)置了一些數(shù)據(jù)校驗(yàn)器,我們這里使用 JSR-303的Validator來(lái)作為校驗(yàn)器。

校驗(yàn)器

寫(xiě)在MyBatchConfig類(lèi)里

/**
 * 注冊(cè)校驗(yàn)器
 * @return
 */
@Bean
public MyBeanValidator myBeanValidator(){
    return new MyBeanValidator<BlogInfo>();
}

創(chuàng)建MyItemProcessor.java

ps:里面我的數(shù)據(jù)處理邏輯是,獲取出讀取數(shù)據(jù)里面的每條數(shù)據(jù)的blogItem字段,如果是springboot,那就對(duì)title字段值進(jìn)行替換。

其實(shí)也就是模擬一個(gè)簡(jiǎn)單地?cái)?shù)據(jù)處理場(chǎng)景。

import com.example.batchdemo.pojo.BlogInfo;
import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;
/**
 * @Author : JCccc
 * @Description :
 **/
public class MyItemProcessor extends ValidatingItemProcessor<BlogInfo> {
    @Override
    public BlogInfo process(BlogInfo item) throws ValidationException {
        /**
         * 需要執(zhí)行super.process(item)才會(huì)調(diào)用自定義校驗(yàn)器
         */
        super.process(item);
        /**
         * 對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單的處理
         */
        if (item.getBlogItem().equals("springboot")) {
            item.setBlogTitle("springboot 系列還請(qǐng)看看我Jc");
        } else {
            item.setBlogTitle("未知系列");
        }
        return item;
    }
}

創(chuàng)建MyBeanValidator.java:

import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.InitializingBean;
import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.ValidatorFactory;
import java.util.Set;
/**
 * @Author : JCccc
 * @Description :
 **/
public class MyBeanValidator<T> implements Validator<T>, InitializingBean {
    private javax.validation.Validator validator;
    @Override
    public void validate(T value) throws ValidationException {
        /**
         * 使用Validator的validate方法校驗(yàn)數(shù)據(jù)
         */
        Set<ConstraintViolation<T>> constraintViolations =
                validator.validate(value);
        if (constraintViolations.size() > 0) {
            StringBuilder message = new StringBuilder();
            for (ConstraintViolation<T> constraintViolation : constraintViolations) {
                message.append(constraintViolation.getMessage() + "\n");
            }
            throw new ValidationException(message.toString());
        }
    }
    /**
     * 使用JSR-303的Validator來(lái)校驗(yàn)我們的數(shù)據(jù),在此進(jìn)行JSR-303的Validator的初始化
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        ValidatorFactory validatorFactory =
                Validation.buildDefaultValidatorFactory();
        validator = validatorFactory.usingContext().getValidator();
    }
}

ps:其實(shí)該篇文章沒(méi)有使用這個(gè)數(shù)據(jù)校驗(yàn)器,大家想使用的話,可以在實(shí)體類(lèi)上添加一些校驗(yàn)器的注解@NotNull @Max @Email等等。我偏向于直接在處理器里面進(jìn)行處理,想把關(guān)于數(shù)據(jù)處理的代碼都寫(xiě)在一塊。

ItemWriter

寫(xiě)在MyBatchConfig類(lèi)里

/**
 * ItemWriter定義:指定datasource,設(shè)置批量插入sql語(yǔ)句,寫(xiě)入數(shù)據(jù)庫(kù)
 * @param dataSource
 * @return
 */
@Bean
public ItemWriter<BlogInfo> writer(DataSource dataSource){
    // 使用jdbcBcatchItemWrite寫(xiě)數(shù)據(jù)到數(shù)據(jù)庫(kù)中
    JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>();
    // 設(shè)置有參數(shù)的sql語(yǔ)句
    writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>());
    String sql = "insert into bloginfo "+" (blogAuthor,blogUrl,blogTitle,blogItem) "
            +" values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)";
    writer.setSql(sql);
    writer.setDataSource(dataSource);
    return writer;
}

簡(jiǎn)單代碼解析:

同樣 對(duì)于數(shù)據(jù)讀取器 ItemWriter ,我們給它也安排了一個(gè)輸出監(jiān)聽(tīng)器,創(chuàng)建 MyWriteListener.java

import com.example.batchdemo.pojo.BlogInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ItemWriteListener;
import java.util.List;
import static java.lang.String.format;
/**
 * @Author : JCccc
 * @Description :
 **/
public class MyWriteListener implements ItemWriteListener<BlogInfo> {
    private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);
    @Override
    public void beforeWrite(List<? extends BlogInfo> items) {
    }
    @Override
    public void afterWrite(List<? extends BlogInfo> items) {
    }
    @Override
    public void onWriteError(Exception exception, List<? extends BlogInfo> items) {
        try {
            logger.info(format("%s%n", exception.getMessage()));
            for (BlogInfo message : items) {
                logger.info(format("Failed writing BlogInfo : %s", message.toString()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

ItemReader、ItemProcessorItemWriter,這三個(gè)小組件到這里,我們都實(shí)現(xiàn)了,那么接下來(lái)就是把這三個(gè)小組件跟我們的step去綁定起來(lái)。

寫(xiě)在MyBatchConfig類(lèi)里

/**
 * step定義:
 * 包括
 * ItemReader 讀取
 * ItemProcessor  處理
 * ItemWriter 輸出
 * @param stepBuilderFactory
 * @param reader
 * @param writer
 * @param processor
 * @return
 */
@Bean
public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<BlogInfo> reader,
                 ItemWriter<BlogInfo> writer, ItemProcessor<BlogInfo, BlogInfo> processor){
    return stepBuilderFactory
            .get("myStep")
            .<BlogInfo, BlogInfo>chunk(65000) // Chunk的機(jī)制(即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進(jìn)行寫(xiě)入操作)
            .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
            .listener(new MyReadListener())
            .processor(processor)
            .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
            .listener(new MyWriteListener())
            .build();
}

這個(gè)Step,稍作講解。

前邊提到了,spring batch框架,提供了事務(wù)的控制,重啟,檢測(cè)跳過(guò)等等機(jī)制。

那么,這些東西的實(shí)現(xiàn),很多都在于這個(gè)step環(huán)節(jié)的設(shè)置。

首先看到我們代碼出現(xiàn)的第一個(gè)設(shè)置,chunk( 6500 ) ,Chunk的機(jī)制(即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進(jìn)行寫(xiě)入操作。

沒(méi)錯(cuò),對(duì)于整個(gè)step環(huán)節(jié),就是數(shù)據(jù)的讀取,處理最后到輸出。

這個(gè)chunk機(jī)制里,我們傳入的 6500,也就是是告訴它,讀取處理數(shù)據(jù),累計(jì)達(dá)到 6500條進(jìn)行一次批次處理,去執(zhí)行寫(xiě)入操作。

這個(gè)傳值,是根據(jù)具體業(yè)務(wù)而定,可以是500條一次,1000條一次,也可以是20條一次,50條一次。

通過(guò)一張簡(jiǎn)單的小圖來(lái)幫助理解:

在我們大量數(shù)據(jù)處理,不管是讀取或者說(shuō)是寫(xiě)入,都肯定會(huì)涉及到一些未知或者已知因素導(dǎo)致某條數(shù)據(jù)失敗了。

那么如果說(shuō)咱們啥也不設(shè)置,失敗一條數(shù)據(jù),那么我們就當(dāng)作整個(gè)失敗了?。顯然這個(gè)太不人性,所以spring batch 提供了 retry 和 skip 兩個(gè)設(shè)置(其實(shí)還有restart) ,通過(guò)這兩個(gè)設(shè)置來(lái)人性化地解決一些數(shù)據(jù)操作失敗場(chǎng)景。

retryLimit(3).retry(Exception.class)

沒(méi)錯(cuò),這個(gè)就是設(shè)置重試,當(dāng)出現(xiàn)異常的時(shí)候,重試多少次。我們?cè)O(shè)置為3,也就是說(shuō)當(dāng)一條數(shù)據(jù)操作失敗,那我們會(huì)對(duì)這條數(shù)據(jù)進(jìn)行重試3次,還是失敗就是 當(dāng)做失敗了, 那么我們?nèi)绻信渲胹kip(推薦配置使用),那么這個(gè)數(shù)據(jù)失敗記錄就會(huì)留到給 skip 來(lái)處理。

skip(Exception.class).skipLimit(2)

skip,跳過(guò),也就是說(shuō)我們?nèi)绻O(shè)置3, 那么就是可以容忍 3條數(shù)據(jù)的失敗。只有達(dá)到失敗數(shù)據(jù)達(dá)到3次,我們才中斷這個(gè)step。

對(duì)于失敗的數(shù)據(jù),我們做了相關(guān)的監(jiān)聽(tīng)器以及異常信息記錄,供與后續(xù)手動(dòng)補(bǔ)救。

那么記下來(lái)我們開(kāi)始去調(diào)用這個(gè)批處理job,我們通過(guò)接口去觸發(fā)這個(gè)批處理事件,新建一個(gè)Controller,TestController.java

/**
 * @Author : JCccc
 * @Description :
 **/
@RestController
public class TestController {
    @Autowired
    SimpleJobLauncher jobLauncher;
    @Autowired
    Job myJob;
    @GetMapping("testJob")
    public  void testJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
     //    后置參數(shù):使用JobParameters中綁定參數(shù) addLong  addString 等方法
        JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
        jobLauncher.run(myJob, jobParameters);
    }
}

對(duì)了,我準(zhǔn)備了一個(gè)csv文件 bloginfo.csv,里面大概8萬(wàn)多條數(shù)據(jù),用來(lái)進(jìn)行批處理測(cè)試:

這個(gè)文件的路徑跟我們的數(shù)據(jù)讀取器里面讀取的路徑要一直,

目前我們數(shù)據(jù)庫(kù)是這個(gè)樣子,

接下來(lái)我們把我們的項(xiàng)目啟動(dòng)起來(lái),再看一眼數(shù)據(jù)庫(kù),生成了一些batch用來(lái)跟蹤記錄job的一些數(shù)據(jù)表:

我們來(lái)調(diào)用一下testJob接口,

然后看下數(shù)據(jù)庫(kù),可以看的數(shù)據(jù)全部都進(jìn)行了相關(guān)的邏輯處理并插入到了數(shù)據(jù)庫(kù):

到這里,我們對(duì)Springboot 整合 spring batch 其實(shí)已經(jīng)操作完畢了,也實(shí)現(xiàn)了從csv文件讀取數(shù)據(jù)處理存儲(chǔ)的業(yè)務(wù)場(chǎng)景。

從數(shù)據(jù)庫(kù)讀取數(shù)據(jù)

ps:前排提示使用druid有坑。后面會(huì)講到。

那么接下來(lái)實(shí)現(xiàn)場(chǎng)景,從數(shù)據(jù)庫(kù)表內(nèi)讀取數(shù)據(jù)進(jìn)行處理輸出到新的表里面。

那么基于我們上邊的整合,我們已經(jīng)實(shí)現(xiàn)了

JobRepository job的注冊(cè)/存儲(chǔ)器
JobLauncher job的執(zhí)行器
Job job任務(wù),包含一個(gè)或多個(gè)Step
Step 包含(ItemReader、ItemProcessor和ItemWriter)
ItemReader 數(shù)據(jù)讀取器
ItemProcessor 數(shù)據(jù)處理器
ItemWriter 數(shù)據(jù)輸出器
job 監(jiān)聽(tīng)器
reader 監(jiān)聽(tīng)器
writer 監(jiān)聽(tīng)器
process 數(shù)據(jù)校驗(yàn)器

那么對(duì)于我們新寫(xiě)一個(gè)job完成 一個(gè)新的場(chǎng)景,我們需要全部重寫(xiě)么?

顯然沒(méi)必要,當(dāng)然完全新寫(xiě)一套也是可以的。

那么該篇,對(duì)于一個(gè)新的也出場(chǎng)景,從csv文件讀取數(shù)據(jù)轉(zhuǎn)換到數(shù)據(jù)庫(kù)表讀取數(shù)據(jù),我們重新新建的有:

  • 數(shù)據(jù)讀取器: 原先使用的是 FlatFileItemReader ,我們現(xiàn)在改為使用 MyBatisCursorItemReader
  • 數(shù)據(jù)處理器: 新的場(chǎng)景,業(yè)務(wù)為了好擴(kuò)展,所以我們處理器最好也新建一個(gè)
  • 數(shù)據(jù)輸出器: 新的場(chǎng)景,業(yè)務(wù)為了好擴(kuò)展,所以我們數(shù)據(jù)輸出器最好也新建一個(gè)
  • step的綁定設(shè)置: 新的場(chǎng)景,業(yè)務(wù)為了好擴(kuò)展,所以我們step最好也新建一個(gè)
  • Job: 當(dāng)然是要重新寫(xiě)一個(gè)了

其他我們照用原先的就行,JobRepository,JobLauncher以及各種監(jiān)聽(tīng)器啥的,暫且不重新建了。

新建MyItemProcessorNew.java

import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;
/**
 * @Author : JCccc
 * @Description :
 **/
public class MyItemProcessorNew extends ValidatingItemProcessor<BlogInfo> {
    @Override
    public BlogInfo process(BlogInfo item) throws ValidationException {
        /**
         * 需要執(zhí)行super.process(item)才會(huì)調(diào)用自定義校驗(yàn)器
         */
        super.process(item);
        /**
         * 對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單的處理
         */
        Integer authorId= Integer.valueOf(item.getBlogAuthor());
        if (authorId<20000) {
            item.setBlogTitle("這是都是小于20000的數(shù)據(jù)");
        } else if (authorId>20000 && authorId<30000){
            item.setBlogTitle("這是都是小于30000但是大于20000的數(shù)據(jù)");
        }else {
            item.setBlogTitle("舊書(shū)不厭百回讀");
        }
        return item;
    }
}

然后其他重新定義的小組件,寫(xiě)在MyBatchConfig類(lèi)里:

/**
 * 定義job
 * @param jobs
 * @param stepNew
 * @return
 */
@Bean
public Job myJobNew(JobBuilderFactory jobs, Step stepNew){
    return jobs.get("myJobNew")
            .incrementer(new RunIdIncrementer())
            .flow(stepNew)
            .end()
            .listener(myJobListener())
            .build();
}
@Bean
public Step stepNew(StepBuilderFactory stepBuilderFactory, MyBatisCursorItemReader<BlogInfo> itemReaderNew,
                    ItemWriter<BlogInfo> writerNew, ItemProcessor<BlogInfo, BlogInfo> processorNew){
    return stepBuilderFactory
            .get("stepNew")
            .<BlogInfo, BlogInfo>chunk(65000) // Chunk的機(jī)制(即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進(jìn)行寫(xiě)入操作)
            .reader(itemReaderNew).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(10)
            .listener(new MyReadListener())
            .processor(processorNew)
            .writer(writerNew).faultTolerant().skip(Exception.class).skipLimit(2)
            .listener(new MyWriteListener())
            .build();
}
@Bean
public ItemProcessor<BlogInfo, BlogInfo> processorNew(){
    MyItemProcessorNew csvItemProcessor = new MyItemProcessorNew();
    // 設(shè)置校驗(yàn)器
    csvItemProcessor.setValidator(myBeanValidator());
    return csvItemProcessor;
}
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Bean
@StepScope
//Spring Batch提供了一個(gè)特殊的bean scope類(lèi)(StepScope:作為一個(gè)自定義的Spring bean scope)。這個(gè)step scope的作用是連接batches的各個(gè)steps。這個(gè)機(jī)制允許配置在Spring的beans當(dāng)steps開(kāi)始時(shí)才實(shí)例化并且允許你為這個(gè)step指定配置和參數(shù)。
public MyBatisCursorItemReader<BlogInfo> itemReaderNew(@Value("#{jobParameters[authorId]}") String authorId) {
        System.out.println("開(kāi)始查詢數(shù)據(jù)庫(kù)");
        MyBatisCursorItemReader<BlogInfo> reader = new MyBatisCursorItemReader<>();
        reader.setQueryId("com.example.batchdemo.mapper.BlogMapper.queryInfoById");
        reader.setSqlSessionFactory(sqlSessionFactory);
         Map<String , Object> map = new HashMap<>();
          map.put("authorId" , Integer.valueOf(authorId));
         reader.setParameterValues(map);
        return reader;
}
/**
 * ItemWriter定義:指定datasource,設(shè)置批量插入sql語(yǔ)句,寫(xiě)入數(shù)據(jù)庫(kù)
 * @param dataSource
 * @return
 */
@Bean
public ItemWriter<BlogInfo> writerNew(DataSource dataSource){
    // 使用jdbcBcatchItemWrite寫(xiě)數(shù)據(jù)到數(shù)據(jù)庫(kù)中
    JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>();
    // 設(shè)置有參數(shù)的sql語(yǔ)句
    writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>());
    String sql = "insert into bloginfonew "+" (blogAuthor,blogUrl,blogTitle,blogItem) "
            +" values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)";
    writer.setSql(sql);
    writer.setDataSource(dataSource);
    return writer;
}

代碼需要注意的點(diǎn)

數(shù)據(jù)讀取器 MyBatisCursorItemReader

對(duì)應(yīng)的mapper方法:

數(shù)據(jù)處理器 MyItemProcessorNew:

數(shù)據(jù)輸出器,新插入到別的數(shù)據(jù)庫(kù)表去,特意這樣為了測(cè)試:

當(dāng)然我們的數(shù)據(jù)庫(kù)為了測(cè)試這個(gè)場(chǎng)景,也是新建了一張表,bloginfonew 表。

接下來(lái),我們新寫(xiě)一個(gè)接口來(lái)執(zhí)行新的這個(gè)job:

@Autowired
SimpleJobLauncher jobLauncher;
@Autowired
Job myJobNew;
@GetMapping("testJobNew")
public  void testJobNew(@RequestParam("authorId") String authorId) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
    JobParameters jobParametersNew = new JobParametersBuilder().addLong("timeNew", System.currentTimeMillis())
            .addString("authorId",authorId)
            .toJobParameters();
    jobLauncher.run(myJobNew,jobParametersNew);
}

ok,我們來(lái)調(diào)用一些這個(gè)接口:

看下控制臺(tái):

沒(méi)錯(cuò),這就是失敗的,原因是因?yàn)楦鷇ruid有關(guān),報(bào)了一個(gè)數(shù)據(jù)庫(kù)功能不支持。這是在數(shù)據(jù)讀取的時(shí)候報(bào)的錯(cuò)。

我初步測(cè)試認(rèn)為是MyBatisCursorItemReader ,druid 數(shù)據(jù)庫(kù)連接池不支持。

那么,我們只需要:

注釋掉druid連接池 jar依賴

yml里替換連接池配置

其實(shí)我們不配置其他連接池,springboot 2.X 版本已經(jīng)為我們整合了默認(rèn)的連接池 HikariCP 。

在Springboot2.X版本,數(shù)據(jù)庫(kù)的連接池官方推薦使用HikariCP

如果不是為了druid的那些后臺(tái)監(jiān)控?cái)?shù)據(jù),sql分析等等,完全是優(yōu)先使用HikariCP的。

官方的原話

We preferHikariCPfor its performance and concurrency. If HikariCP is available, we always choose it.

翻譯:

我們更喜歡hikaricpf的性能和并發(fā)性。如果有HikariCP,我們總是選擇它。

所以我們就啥連接池也不配了,使用默認(rèn)的HikariCP 連接池。

推薦一個(gè)開(kāi)源免費(fèi)的 Spring Boot 實(shí)戰(zhàn)項(xiàng)目:

https://github.com/javastacks/spring-boot-best-practice

當(dāng)然你想配,也是可以的:

所以我們剔除掉druid鏈接池后,我們?cè)賮?lái)調(diào)用一下新接口:

可以看到,從數(shù)據(jù)庫(kù)獲取數(shù)據(jù)并進(jìn)行批次處理寫(xiě)入job是成功的:

新的表里面插入的數(shù)據(jù)都進(jìn)行了自己寫(xiě)的邏輯處理:

好了,springboot 整合 spring batch 批處理框架, 就到此吧。

到此這篇關(guān)于Spring Boot + Spring Batch 實(shí)現(xiàn)批處理任務(wù)的文章就介紹到這了,更多相關(guān)Spring Boot Spring Batch 批處理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java Poi 在Excel中輸出特殊符號(hào)的實(shí)現(xiàn)方法

    Java Poi 在Excel中輸出特殊符號(hào)的實(shí)現(xiàn)方法

    這篇文章主要介紹了Java Poi 在Excel中輸出特殊符號(hào)的實(shí)現(xiàn)方法,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-07-07
  • 如何配置Spring Boot中的Jackson序列化

    如何配置Spring Boot中的Jackson序列化

    在開(kāi)發(fā)基于Spring Boot的應(yīng)用程序時(shí),Jackson是默認(rèn)的JSON序列化和反序列化工具,本文將詳細(xì)介紹如何在Spring Boot中配置Jackson,以滿足這些需求,感興趣的朋友一起看看吧
    2025-04-04
  • SSH框架網(wǎng)上商城項(xiàng)目第4戰(zhàn)之EasyUI菜單的實(shí)現(xiàn)

    SSH框架網(wǎng)上商城項(xiàng)目第4戰(zhàn)之EasyUI菜單的實(shí)現(xiàn)

    SSH框架網(wǎng)上商城項(xiàng)目第4戰(zhàn)之EasyUI菜單的實(shí)現(xiàn),本文主要使用EasyUI技術(shù)簡(jiǎn)單實(shí)現(xiàn)后臺(tái)菜單,感興趣的小伙伴們可以參考一下
    2016-05-05
  • Sparsearray稀疏數(shù)組原理及實(shí)例詳解

    Sparsearray稀疏數(shù)組原理及實(shí)例詳解

    這篇文章主要介紹了Sparsearray稀疏數(shù)組原理及實(shí)例詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-05-05
  • Java實(shí)現(xiàn)解析zip壓縮包并獲取文件內(nèi)容

    Java實(shí)現(xiàn)解析zip壓縮包并獲取文件內(nèi)容

    這篇文章主要為大家詳細(xì)介紹了如何利用Java語(yǔ)言實(shí)現(xiàn)頁(yè)面上傳一個(gè)源碼壓縮包,后端將壓縮包解壓,并獲取每個(gè)文件中的內(nèi)容,感興趣的可以動(dòng)手嘗試一下
    2022-07-07
  • Java 實(shí)戰(zhàn)項(xiàng)目錘煉之網(wǎng)上花店商城的實(shí)現(xiàn)流程

    Java 實(shí)戰(zhàn)項(xiàng)目錘煉之網(wǎng)上花店商城的實(shí)現(xiàn)流程

    讀萬(wàn)卷書(shū)不如行萬(wàn)里路,只學(xué)書(shū)上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+jsp+servlet+mysql+ajax實(shí)現(xiàn)一個(gè)網(wǎng)上花店商城系統(tǒng),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平
    2021-11-11
  • Java反射,泛型在Json中的運(yùn)用

    Java反射,泛型在Json中的運(yùn)用

    這篇文章主要介紹了Java反射,泛型在Json中的運(yùn)用,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下
    2020-12-12
  • Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(11)

    Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(11)

    下面小編就為大家?guī)?lái)一篇Java基礎(chǔ)的幾道練習(xí)題(分享)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧,希望可以幫到你
    2021-07-07
  • JavaWeb?Servlet實(shí)現(xiàn)文件上傳與下載功能實(shí)例

    JavaWeb?Servlet實(shí)現(xiàn)文件上傳與下載功能實(shí)例

    因自己負(fù)責(zé)的項(xiàng)目中需要實(shí)現(xiàn)文件上傳,所以下面下面這篇文章主要給大家介紹了關(guān)于JavaWeb?Servlet實(shí)現(xiàn)文件上傳與下載功能的相關(guān)資料,需要的朋友可以參考下
    2022-04-04
  • java實(shí)現(xiàn)ATM取款項(xiàng)目

    java實(shí)現(xiàn)ATM取款項(xiàng)目

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)ATM取款項(xiàng)目的實(shí)現(xiàn)代碼,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-06-06

最新評(píng)論