欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot+SpringBatch+Quartz整合定時批量任務(wù)方式

 更新時間:2021年09月07日 15:00:08   作者:止步前行  
這篇文章主要介紹了SpringBoot+SpringBatch+Quartz整合定時批量任務(wù)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

一、引言

最近一周,被借調(diào)到其他部門,趕一個緊急需求,需求內(nèi)容如下:

PC網(wǎng)頁觸發(fā)一條設(shè)備升級記錄(下圖),后臺要定時批量設(shè)備更新。這里定時要用到Quartz,批量數(shù)據(jù)處理要用到SpringBatch,二者結(jié)合,可以完成該需求。

由于之前,沒有用過SpringBatch,于是上網(wǎng)查了下資料,發(fā)現(xiàn)可參考的不是很多,于是只能去慢慢的翻看官方文檔。

遇到不少問題,就記錄一下吧。

在這里插入圖片描述

二、代碼具體實現(xiàn)

1、pom文件

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
   </dependencies>

2、application.yaml文件

spring:
  datasource:
    username: thinklink
    password: thinklink
    url: jdbc:postgresql://172.16.205.54:5432/thinklink
    driver-class-name: org.postgresql.Driver
  batch:
    job:
      enabled: false
server:
  port: 8073
#upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/
upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/
# 每次批量處理的數(shù)據(jù)量,默認(rèn)為5000
batch-size: 5000

3、Service實現(xiàn)類

觸發(fā)批處理任務(wù)的入口,執(zhí)行一個job

@Service("batchService")
public class BatchServiceImpl implements BatchService {
	// 框架自動注入
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job updateDeviceJob;
    /**
     * 根據(jù) taskId 創(chuàng)建一個Job
     * @param taskId
     * @throws Exception
     */
    @Override
    public void createBatchJob(String taskId) throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("taskId", taskId)
                .addString("uuid", UUID.randomUUID().toString().replace("-",""))
                .toJobParameters();
        // 傳入一個Job任務(wù)和任務(wù)需要的參數(shù)
        jobLauncher.run(updateDeviceJob, jobParameters);
    }
}

4、SpringBatch配置類

此部分最重要(☆☆☆☆☆)

@Configuration
public class BatchConfiguration {
    private static final Logger log = LoggerFactory.getLogger(BatchConfiguration.class);
    @Value("${batch-size:5000}")
    private int batchSize;
	// 框架自動注入
    @Autowired
    public JobBuilderFactory jobBuilderFactory;
	// 框架自動注入
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
	// 數(shù)據(jù)過濾器,對從數(shù)據(jù)庫讀出來的數(shù)據(jù),注意進行操作
    @Autowired
    public TaskItemProcessor taskItemProcessor;
    // 接收job參數(shù)
    public Map<String, JobParameter> parameters;
    public Object taskId;
    @Autowired
    private JdbcTemplate jdbcTemplate;
	// 讀取數(shù)據(jù)庫操作
    @Bean
    @StepScope
    public JdbcCursorItemReader<DispatchRequest> itemReader(DataSource dataSource) {
        String querySql = " SELECT " +
                " e. ID AS taskId, " +
                " e.user_id AS userId, " +
                " e.timing_startup AS startTime, " +
                " u.device_id AS deviceId, " +
                " d.app_name AS appName, " +
                " d.compose_file AS composeFile, " +
                " e.failure_retry AS failureRetry, " +
                " e.tetry_times AS retryTimes, " +
                " e.device_managered AS deviceManagered " +
                " FROM " +
                " eiot_upgrade_task e " +
                " LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " +
                " LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " +
                " WHERE " +
                " ( " +
                " u.device_upgrade_status = 0 " +
                " OR u.device_upgrade_status = 2" +
                " )" +
                " AND e.tetry_times > u.retry_times " +
                " AND e. ID = ?";
        return new JdbcCursorItemReaderBuilder<DispatchRequest>()
                .name("itemReader")
                .sql(querySql)
                .dataSource(dataSource)
                .queryArguments(new Object[]{parameters.get("taskId").getValue()})
                .rowMapper(new DispatchRequest.DispatchRequestRowMapper())
                .build();
    }
	// 將結(jié)果寫回數(shù)據(jù)庫
    @Bean
    @StepScope
    public ItemWriter<ProcessResult> itemWriter() {
        return new ItemWriter<ProcessResult>() {
            private int updateTaskStatus(DispatchRequest dispatchRequest, int status) {
                log.info("update taskId: {}, deviceId: {} to status {}", dispatchRequest.getTaskId(), dispatchRequest.getDeviceId(), status);
                Integer retryTimes = jdbcTemplate.queryForObject(
                        "select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?",
                        new Object[]{ dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()}, Integer.class
                );
                retryTimes += 1;
                int updateCount = jdbcTemplate.update("update eiot_upgrade_device set device_upgrade_status = ?, retry_times = ? " +
                        "where device_id = ? and upgrade_task_id = ?", status, retryTimes, dispatchRequest.getDeviceId(), dispatchRequest.getTaskId());
                if (updateCount <= 0) {
                    log.warn("no task updated");
                } else {
                    log.info("count of {} task updated", updateCount);
                }
                // 最后一次重試
                if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) {
                    log.info("the last retry of {} failed, inc deviceManagered", dispatchRequest.getTaskId());
                    return 1;
                } else {
                    return 0;
                }
            }
            @Override
            @Transactional
            public void write(List<? extends ProcessResult> list) throws Exception {
                Map taskMap = jdbcTemplate.queryForMap(
                        "select device_managered, device_count, task_status from eiot_upgrade_task where id = ?",
                        list.get(0).getDispatchRequest().getTaskId() // 我們認(rèn)定一個批量里面,taskId都是一樣的
                        );
                int deviceManagered = (int)taskMap.get("device_managered");
                Integer deviceCount = (Integer) taskMap.get("device_count");
                if (deviceCount == null) {
                    log.warn("deviceCount of task {} is null", list.get(0).getDispatchRequest().getTaskId());
                }
                int taskStatus = (int)taskMap.get("task_status");
                for (ProcessResult result: list) {
                    deviceManagered += updateTaskStatus(result.getDispatchRequest(), result.getStatus());
                }
                if (deviceCount != null && deviceManagered == deviceCount) {
                    taskStatus = 2; //任務(wù)狀態(tài) 0:待升級,1:升級中,2:已完成
                }
                jdbcTemplate.update("update eiot_upgrade_task  set device_managered = ?, task_status = ? " +
                        "where id = ?", deviceManagered, taskStatus, list.get(0).getDispatchRequest().getTaskId());
            }
        };
    }
    /**
     * 定義一個下發(fā)更新的 job
     * @return
     */
    @Bean
    public Job updateDeviceJob(Step updateDeviceStep) {
        return jobBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))
                .listener(new JobListener()) // 設(shè)置Job的監(jiān)聽器
                .flow(updateDeviceStep)// 執(zhí)行下發(fā)更新的Step
                .end()
                .build();
    }
    /**
     * 定義一個下發(fā)更新的 step
     * @return
     */
    @Bean
    public Step updateDeviceStep(JdbcCursorItemReader<DispatchRequest> itemReader,ItemWriter<ProcessResult> itemWriter) {
        return stepBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))
                .<DispatchRequest, ProcessResult> chunk(batchSize)
                .reader(itemReader) //根據(jù)taskId從數(shù)據(jù)庫讀取更新設(shè)備信息
                .processor(taskItemProcessor) // 每條更新信息,執(zhí)行下發(fā)更新接口
                .writer(itemWriter)
                .build();
    }
    // job 監(jiān)聽器
    public class JobListener implements JobExecutionListener {
        @Override
        public void beforeJob(JobExecution jobExecution) {
            log.info(jobExecution.getJobInstance().getJobName() + " before... ");
            parameters = jobExecution.getJobParameters().getParameters();
            taskId = parameters.get("taskId").getValue();
            log.info("job param taskId : " + parameters.get("taskId"));
        }
        @Override
        public void afterJob(JobExecution jobExecution) {
            log.info(jobExecution.getJobInstance().getJobName() + " after... ");
            // 當(dāng)所有job執(zhí)行完之后,查詢設(shè)備更新狀態(tài),如果有失敗,則要定時重新執(zhí)行job
            String sql = " SELECT " +
                    " count(*) " +
                    " FROM " +
                    " eiot_upgrade_device d " +
                    " LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " +
                    " WHERE " +
                    " u. ID = ? " +
                    " AND d.retry_times < u.tetry_times " +
                    " AND ( " +
                    " d.device_upgrade_status = 0 " +
                    " OR d.device_upgrade_status = 2 " +
                    " ) ";
            // 獲取更新失敗的設(shè)備個數(shù)
            Integer count = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
            log.info("update device failure count : " + count);
            // 下面是使用Quartz觸發(fā)定時任務(wù)
            // 獲取任務(wù)時間,單位秒
//            String time = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
            // 此處方便測試,應(yīng)該從數(shù)據(jù)庫中取taskId對應(yīng)的重試間隔,單位秒
            Integer millSecond = 10;
            if(count != null && count > 0){
                String jobName = "UpgradeTask_" + taskId;
                String reTaskId = taskId.toString();
                Map<String,Object> params = new HashMap<>();
                params.put("jobName",jobName);
                params.put("taskId",reTaskId);
                if (QuartzManager.checkNameNotExist(jobName))
                {
                    QuartzManager.scheduleRunOnceJob(jobName, RunOnceJobLogic.class,params,millSecond);
                }
            }
        }
    }
}

5、Processor,處理每條數(shù)據(jù)

可以在此對數(shù)據(jù)進行過濾操作

@Component("taskItemProcessor")
public class TaskItemProcessor implements ItemProcessor<DispatchRequest, ProcessResult> {
    public static final int STATUS_DISPATCH_FAILED = 2;
    public static final int STATUS_DISPATCH_SUCC = 1;
    private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor.class);
    @Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")
    private String dispatchUrl;
    @Autowired
    JdbcTemplate jdbcTemplate;
    /**
     * 在這里,執(zhí)行 下發(fā)更新指令 的操作
     * @param dispatchRequest
     * @return
     * @throws Exception
     */
    @Override
    public ProcessResult process(final DispatchRequest dispatchRequest) {
        // 調(diào)用接口,下發(fā)指令
        String url = dispatchUrl + dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();
        log.info("request url:" + url);
        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
        MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();
        JSONObject jsonOuter = new JSONObject();
        JSONObject jsonInner = new JSONObject();
        try {
            jsonInner.put("jobId",dispatchRequest.getTaskId());
            jsonInner.put("name",dispatchRequest.getName());
            jsonInner.put("composeFile", Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));
            jsonInner.put("policy",new JSONObject().put("startTime",dispatchRequest.getPolicy()));
            jsonInner.put("timestamp",dispatchRequest.getTimestamp());
            jsonOuter.put("method","updateApp");
            jsonOuter.put("params",jsonInner);
        } catch (JSONException e) {
            log.info("JSON convert Exception :" + e);
        }catch (IOException e) {
            log.info("Base64Util bytesToBase64Str :" + e);
        }
        log.info("request body json :" + jsonOuter);
        HttpEntity<String> requestEntity = new HttpEntity<String>(jsonOuter.toString(),headers);
        int status;
        try {
            ResponseEntity<String> response = restTemplate.postForEntity(url,requestEntity,String.class);
            log.info("response :" + response);
            if (response.getStatusCode() == HttpStatus.OK) {
                status = STATUS_DISPATCH_SUCC;
            } else {
                status = STATUS_DISPATCH_FAILED;
            }
        }catch (Exception e){
            status = STATUS_DISPATCH_FAILED;
        }
        return new ProcessResult(dispatchRequest, status);
    }
}

6、封裝數(shù)據(jù)庫返回數(shù)據(jù)的實體Bean

注意靜態(tài)內(nèi)部類

public class DispatchRequest {
    private String taskId;
    private String deviceId;
    private String userId;
    private String name;
    private byte[] composeFile;
    private String policy;
    private String timestamp;
    private String md5;
    private int failureRetry;
    private int retryTimes;
    private int deviceManagered;
   // 省略構(gòu)造函數(shù),setter/getter/tostring方法
   //......
   
    public static class DispatchRequestRowMapper implements RowMapper<DispatchRequest> {
        @Override
        public DispatchRequest mapRow(ResultSet resultSet, int i) throws SQLException {
            DispatchRequest dispatchRequest = new DispatchRequest();
            dispatchRequest.setTaskId(resultSet.getString("taskId"));
            dispatchRequest.setUserId(resultSet.getString("userId"));
            dispatchRequest.setPolicy(resultSet.getString("startTime"));
            dispatchRequest.setDeviceId(resultSet.getString("deviceId"));
            dispatchRequest.setName(resultSet.getString("appName"));
            dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));
            dispatchRequest.setTimestamp(DateUtil.DateToString(new Date()));
            dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));
            dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));
            dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));
            return dispatchRequest;
        }
    }
}

7、啟動類上要加上注解

@SpringBootApplication
@EnableBatchProcessing
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

三、小結(jié)一下

其實SpringBatch并沒有想象中那么好用,當(dāng)從數(shù)據(jù)庫中每次取5000條數(shù)據(jù)后,進入processor中是逐條處理的,這個時候不能不行操作,等5000條數(shù)據(jù)處理完之后,再一次性執(zhí)行ItemWriter方法。

在使用的過程中,最坑的地方是ItemReader和ItemWriter這兩個地方,如何執(zhí)行自定義的Sql,參考文中代碼就行。至于Quartz定時功能,很簡單,只要定時創(chuàng)建SpringBatch里面的Job,讓這個job啟動就好了,此處就不在給出了,貼的代碼太多了。由于公司一些原因,代碼不能放到GitHub上。

spring-batch與quartz集成過程中遇到的問題

問題

啟動時報Exception

Driver's Blob representation is of an unsupported type: weblogic.jdbc.wrapper.Blob_oracle_sql_BLOB

原因

quartz的driverDelegateClass配置的是OracleDelegate,應(yīng)用運行在weblogic上

解決

driverDelegateClass對應(yīng)配置改為

org.quartz.impl.jdbcjobstore.oracle.weblogic.WebLogicOracleDelegate

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Java?Mybatis?foreach嵌套foreach?List<list<Object>>問題

    Java?Mybatis?foreach嵌套foreach?List<list<Object>&

    在MyBatis的mapper.xml文件中,foreach元素常用于動態(tài)生成SQL查詢條件,此元素包括item(必選,元素別名)、index(可選,元素序號或鍵)、collection(必選,指定迭代對象)、open、separator、close(均為可選,用于定義SQL結(jié)構(gòu))
    2024-09-09
  • JVM的垃圾回收機制你了解嗎

    JVM的垃圾回收機制你了解嗎

    這篇文章主要為大家介紹了JVM的垃圾回收機制,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-01-01
  • 提升性能秘密武器Java Unsafe類面試精講

    提升性能秘密武器Java Unsafe類面試精講

    這篇文章主要為大家介紹了提升性能秘密武器Java Unsafe類面試精講,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-10-10
  • Java微信公眾平臺開發(fā)(4) 回復(fù)消息的分類及實體的創(chuàng)建

    Java微信公眾平臺開發(fā)(4) 回復(fù)消息的分類及實體的創(chuàng)建

    這篇文章主要為大家詳細(xì)介紹了Java微信公眾平臺開發(fā)第四步,回復(fù)消息的分類及實體的創(chuàng)建,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-04-04
  • Spring Cloud Netflix架構(gòu)淺析(小結(jié))

    Spring Cloud Netflix架構(gòu)淺析(小結(jié))

    這篇文章主要介紹了Spring Cloud Netflix架構(gòu)淺析(小結(jié)),詳解的介紹了Spring Cloud Netflix的概念和組件等,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-01-01
  • Spring Boot詳解整合JWT教程

    Spring Boot詳解整合JWT教程

    JWT是目前比較流行的跨域認(rèn)證解決方案,本文主要介紹了SpringBoot整合JWT的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-07-07
  • Spring實戰(zhàn)之獲取其他Bean的屬性值操作示例

    Spring實戰(zhàn)之獲取其他Bean的屬性值操作示例

    這篇文章主要介紹了Spring實戰(zhàn)之獲取其他Bean的屬性值操作,結(jié)合實例形式分析了Spring操作Bean屬性值的相關(guān)配置與實現(xiàn)技巧,需要的朋友可以參考下
    2019-12-12
  • Java讀取.properties配置文件的幾種方式

    Java讀取.properties配置文件的幾種方式

    這篇文章主要介紹了Java讀取.properties配置文件的幾種方式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-09-09
  • 使用RedisAtomicLong優(yōu)化性能問題

    使用RedisAtomicLong優(yōu)化性能問題

    這篇文章主要介紹了使用RedisAtomicLong優(yōu)化性能問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-11-11
  • Spring啟動流程refresh()源碼深入解析

    Spring啟動流程refresh()源碼深入解析

    這篇文章主要給大家介紹了關(guān)于Spring啟動流程refresh()源碼深入解析的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-09-09

最新評論