springboot?@Async?注解如何實現(xiàn)方法異步
@Async注解如何實現(xiàn)方法異步
處理大批量數(shù)據(jù)的時候,效率很慢。所以考慮一下使用多線程。
剛開始自己手寫的一套,用了線程池啟動固定的線程數(shù)進(jìn)行跑批。但是后來老大考慮到自己手寫的風(fēng)險不好控制,所以使用spring的方法。
這里沒有詳細(xì)介紹,只有簡單的demo,只會用,不懂原理:
一、springboot的App類需要的注解
package com.xxx.xxx.xxx;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.boot.web.support.SpringBootServletInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* 類功能說明:服務(wù)生產(chǎn)者啟動類
* <p>
* <strong></strong>
* </p>
*
* @version
* @author
* @since 1.8
*/
@Configuration
@EnableAsync
public class Application extends SpringBootServletInitializer {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 設(shè)置核心線程數(shù)
executor.setCorePoolSize(5);
// 設(shè)置最大線程數(shù)
executor.setMaxPoolSize(60);
// 設(shè)置隊列容量
executor.setQueueCapacity(20);
// 設(shè)置線程活躍時間(秒)
executor.setKeepAliveSeconds(60);
// 設(shè)置默認(rèn)線程名稱
executor.setThreadNamePrefix("what-");
// 設(shè)置拒絕策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任務(wù)結(jié)束后再關(guān)閉線程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
springboot的App類,很簡單,就能使用很多東西。
二、service層的注解
package com.xxx.xxx.service.impl;
import java.util.concurrent.Future;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import com.xxx.xxx.service.XXXAsyncService ;
@Service
public class XXXAsyncServiceImpl implements XXXAsyncService {
@Async
public Future<Long> rtn1() throws Exception {
//do something
//有返回值的時候,可以返回string,long之類的。
return new AsyncResult<>(1);
}
@Async
public void rtn2() throws Exception {
//do something
//這個可以沒有返回值.
}
}
三、調(diào)用層
package com.xxx.xxx.controller;
import java.util.concurrent.Future;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import com.xxx.xxx.service.XXXAsyncService;
@RestController
@RequestMapping(value="/xxx")
public class XXXAsyncController {
@Autowired
private XXXAsyncService xxxAsyncService;
/**
* 這里調(diào)用異步方法
*/
@RequestMapping(value = "/xxx")
public void dodo() throws Exception {
int threads = 10;//十個線程
List<Future<Long>> list = new ArrayList<>();
for(int i = 0;i < threads; i++){
//這里循環(huán)調(diào)用異步方法。
//如果存在大量數(shù)據(jù),可以在這里把數(shù)據(jù)切片,然后循環(huán)調(diào)用,分批處理數(shù)據(jù)。效率杠杠的。
list .add(xxxAsyncService.rtn1());
}
long count = 0;
for(Future<Long> l : tsfCountList) {
//異步調(diào)用需要返回值的時候,這里可以把返回值都放入到list集合中,然后可以統(tǒng)一處理。 這里的get()是阻塞的,因為需要所以異步方法返回,在繼續(xù)執(zhí)行。
count += l.get();
}
System.out.println("調(diào)用次數(shù):" + count);
}
}
這些代碼全是手寫,記錄下來,以后用的時候,省的忘了,查起來麻煩。。
異步注解@Async的使用以及注意事項
第一步開啟異步
@Configuration
@EnableAsync
public class SpringAsyncConfig { ... }
默認(rèn)情況下,@EnableAsync檢測Spring的@Async注釋和EJB 3.1 javax. EJB .異步;此選項還可用于檢測其他用戶定義的注釋類型。(也可以在SpringBoot的啟動類上直接加@EnableAsync注解)
在 Spring 中,用 @Async 注解指定的方法,該方法被調(diào)用時會以異步的方式執(zhí)行。而如果沒有在 @Async 注解中指定線程池,就會使用默認(rèn)的線程池。默認(rèn)的線程池為 SimpleAsyncTaskExecutor 。
該線程池不會復(fù)用線程,每有一個新任務(wù)被提交,該線程池就會創(chuàng)建一個新的線程實例用于執(zhí)行任務(wù)。下面為相關(guān)的代碼:
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
而如果想要指定線程池,可以通過在 @Async 注解中的 value 參數(shù)中指定所要使用的線程池的 Bean Name 。另一種方法是是一個實現(xiàn)了 AsyncConfigurer 接口或是繼承其默認(rèn)適配器類 AsyncConfigurerSupport 的配置類,這樣 @Async 注解的方法就會使用指定的自定義的線程池。
使用@Async注解的話采用的是springBoot默認(rèn)的線程池,不過一般我們會自定義線程池(因為比較靈活),配置方式有:
- 使用 xml 文件配置的方式
- 使用Java代碼結(jié)合@Configuration進(jìn)行配置(推薦使用)
下面顯示配置線程的代碼實現(xiàn)
package com.deppon.ptos.load.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Description: 異步線程管理
* @Author: LYH
* @CreateDate: 2019/6/27 8:54
* @Version: 1.0
* @JDK: 1.8
*/
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
//配置核心線程數(shù)
executor.setCorePoolSize(corePoolSize);
//配置最大線程數(shù)
executor.setMaxPoolSize(maxPoolSize);
//配置隊列大小
executor.setQueueCapacity(queueCapacity);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時候,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執(zhí)行初始化
executor.initialize();
return executor;
}
}
package com.deppon.ptos.load.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Description: 打印異步線程的執(zhí)行情況 使用Callbale Future 來返回線程的信息
* @Author: 633805 LYH
* @CreateDate: 2019/6/27 8:59
* @Version: 1.0
* @JDK: 1.8
*/
@Component
@Slf4j
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private void showThreadPoolInfo(String prefix) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if (null == threadPoolExecutor) {
return;
}
log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
使用:
@Async("asyncServiceExecutor")
到這一步,異步就算開啟了。
下面主要說一說錯誤的
使用@Async導(dǎo)致異步不成功的情況
如下方式會使@Async失效
- 異步方法使用static修飾
- 異步類沒有使用@Component注解(或其他注解)導(dǎo)致spring無法掃描到異步類
- 異步方法不能與被調(diào)用的異步方法在同一個類中
- 類中需要使用@Autowired或@Resource等注解自動注入,不能自己手動new對象
- 如果使用SpringBoot框架必須在啟動類中增加@EnableAsync注解
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
- 簡述Springboot @Async 異步方法
- springboot實現(xiàn)異步調(diào)用@Async的示例
- SpringBoot用@Async注解實現(xiàn)異步任務(wù)
- 詳解springboot使用異步注解@Async獲取執(zhí)行結(jié)果的坑
- SpringBoot異步使用@Async的原理以及線程池配置詳解
- SpringBoot使用@Async注解處理異步事件的方法
- Springboot如何使用@Async實現(xiàn)異步任務(wù)
- SpringBoot使用@Async注解實現(xiàn)異步調(diào)用
- springboot異步@Async的使用及失效場景介紹
相關(guān)文章
Netty分布式抽象編碼器MessageToByteEncoder邏輯分析
這篇文章主要介紹了Netty分布式抽象編碼器MessageToByteEncoder的抽象邏輯分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03
Springboot?手動分頁查詢分批批量插入數(shù)據(jù)的實現(xiàn)流程
這篇文章主要介紹了Springboot?手動分頁查詢分批批量插入數(shù)據(jù)的實現(xiàn)流程,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-07-07
詳解Spring Boot Admin監(jiān)控服務(wù)上下線郵件通知
本篇文章主要介紹了詳解Spring Boot Admin監(jiān)控服務(wù)上下線郵件通知,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-12-12

