Java線程池流程編排運(yùn)用實(shí)戰(zhàn)源碼
引導(dǎo)語(yǔ)
在線程池的面試中,面試官除了喜歡問(wèn) ThreadPoolExecutor 的底層源碼外,還喜歡問(wèn)你有沒(méi)有在實(shí)際的工作中用過(guò) ThreadPoolExecutor,我們?cè)诓l(fā)集合類的《場(chǎng)景集合:并發(fā) List、Map 的應(yīng)用場(chǎng)景》一文中說(shuō)過(guò)一種簡(jiǎn)單的流程引擎,如果沒(méi)有看過(guò)的同學(xué),可以返回去看一下。
本章就在流程引擎的基礎(chǔ)上運(yùn)用 ThreadPoolExecutor,使用線程池實(shí)現(xiàn) SpringBean 的異步執(zhí)行。
1、流程引擎關(guān)鍵代碼回顧
《場(chǎng)景集合:并發(fā) List、Map 的應(yīng)用場(chǎng)景》文中流程引擎執(zhí)行 SpringBean 的核心代碼為:
// 批量執(zhí)行 Spring Bean private void stageInvoke(String flowName, StageEnum stage, FlowContent content) { List<DomainAbilityBean> domainAbilitys = FlowCenter.flowMap.getOrDefault(flowName, Maps.newHashMap()).get(stage); if (CollectionUtils.isEmpty(domainAbilitys)) { throw new RuntimeException("找不到該流程對(duì)應(yīng)的領(lǐng)域行為" + flowName); } for (DomainAbilityBean domainAbility : domainAbilitys) { // 執(zhí)行 Spring Bean domainAbility.invoke(content); } }
入?yún)⑹?flowName(流程名稱)、stage(階段)、content(上下文),其中 stage 中會(huì)執(zhí)行很多 SpringBean,SpringBean 被執(zhí)行的代碼是 domainAbility.invoke(content)。
2、異步執(zhí)行 SpringBean
從上述代碼中,我們可以看到所有的 SpringBean 都是串行執(zhí)行的,效率較低,我們?cè)趯?shí)際業(yè)務(wù)中發(fā)現(xiàn),有的 SpringBean 完全可以異步執(zhí)行,這樣既能完成業(yè)務(wù)請(qǐng)求,又能減少業(yè)務(wù)處理的 rt,對(duì)于這個(gè)需求,我們條件反射的有了兩個(gè)想法:
需要新開(kāi)線程來(lái)異步執(zhí)行 SpringBean,可以使用 Runable 或者 Callable;業(yè)務(wù)請(qǐng)求量很大,我們不能每次來(lái)一個(gè)請(qǐng)求,就開(kāi)一個(gè)線程,我們應(yīng)該讓線程池來(lái)管理異步執(zhí)行的線程。
于是我們決定使用線程池來(lái)完成這個(gè)需求。
3、如何區(qū)分異步的 SpringBean
我們的 SpringBean 都是實(shí)現(xiàn) DomainAbilityBean 這個(gè)接口的,接口定義如下:
public interface DomainAbilityBean { /** * 領(lǐng)域行為的方法入口 */ FlowContent invoke(FlowContent content); }
從接口定義上來(lái)看,沒(méi)有預(yù)留任何地方來(lái)標(biāo)識(shí)該 SpringBean 應(yīng)該是同步執(zhí)行還是異步執(zhí)行,這時(shí)候我們可以采取注解的方式,我們新建一個(gè)注解,只要 SpringBean 上有該注解,表示該 SpringBean 應(yīng)該異步執(zhí)行,否則應(yīng)該同步執(zhí)行,新建的注解如下:
/** * 異步 SpringBean 執(zhí)行注解 * SpringBean 需要異步執(zhí)行的話,就打上該注解 */ @Target(ElementType.TYPE)// 表示該注解應(yīng)該打在類上 @Retention(RetentionPolicy.RUNTIME) @Documented public @interface AsyncComponent { }
接著我們新建了兩個(gè) SpringBean,并在其中一個(gè) SpringBean 上打上異步的注解,并且打印出執(zhí)行 SpringBean 的線程名稱,如下圖:
圖中實(shí)現(xiàn)了兩個(gè) SpringBean:BeanOne 和 BeanTwo,其中 BeanTwo 被打上了 AsyncComponent 注解,表明 BeanTwo 應(yīng)該被異步執(zhí)行,兩個(gè) SpringBean 都打印出執(zhí)行的線程的名稱。
4、mock 流程引擎數(shù)據(jù)中心
《場(chǎng)景集合:并發(fā) List、Map 的應(yīng)用場(chǎng)景》一文中,我們說(shuō)可以從數(shù)據(jù)庫(kù)中加載出流程引擎需要的數(shù)據(jù),此時(shí)我們 mock 一下,mock 的代碼如下:
@Component public class FlowCenter { /** * flowMap 是共享變量,方便訪問(wèn) */ public static final Map<String, Map<StageEnum, List<DomainAbilityBean>>> flowMap = Maps.newConcurrentMap(); /** * PostConstruct 注解的意思就是 * 在容器啟動(dòng)成功之后,初始化 flowMap */ @PostConstruct public void init() { // 初始化 flowMap mock Map<StageEnum, List<DomainAbilityBean>> stageMap = flowMap.getOrDefault("flow1",Maps.newConcurrentMap()); for (StageEnum value : StageEnum.values()) { List<DomainAbilityBean> domainAbilitys = stageMap.getOrDefault(value, Lists.newCopyOnWriteArrayList()); if(CollectionUtils.isEmpty(domainAbilitys)){ domainAbilitys.addAll(ImmutableList.of( ApplicationContextHelper.getBean(BeanOne.class), ApplicationContextHelper.getBean(BeanTwo.class) )); stageMap.put(value,domainAbilitys); } } flowMap.put("flow1",stageMap); // 打印出加載完成之后的數(shù)據(jù)結(jié)果 log.info("init success,flowMap is {}", JSON.toJSONString(flowMap)); } }
5、新建線程池
在以上操作完成之后,只剩下最后一步了,之前我們執(zhí)行 SpringBean 時(shí),是這行代碼:domainAbility.invoke(content);
現(xiàn)在我們需要區(qū)分 SpringBean 是否是異步的,如果是異步的,丟到線程池中去執(zhí)行,如果是同步的,仍然使用原來(lái)的方法進(jìn)行執(zhí)行,于是我們把這些邏輯封裝到一個(gè)工具類中,工具類如下:
public class ComponentExecutor { // 我們新建了一個(gè)線程池 private static ExecutorService executor = new ThreadPoolExecutor(15, 15, 365L, TimeUnit.DAYS, new LinkedBlockingQueue<>()); // 如果 SpringBean 上有 AsyncComponent 注解,表示該 SpringBean 需要異步執(zhí)行,就丟到線程池中去 public static final void run(DomainAbilityBean component, FlowContent content) { // 判斷類上是否有 AsyncComponent 注解 if (AnnotationUtils.isAnnotationPresent(AsyncComponent.class, AopUtils.getTargetClass(component))) { // 提交到線程池中 executor.submit(() -> { component.invoke(content); }); return; } // 同步 SpringBean 直接執(zhí)行。 component.invoke(content); } }
我們把原來(lái)的執(zhí)行代碼替換成使用組件執(zhí)行器執(zhí)行,如下圖:
6、測(cè)試
以上步驟完成之后,簡(jiǎn)單的流程引擎就已經(jīng)完成了,我們簡(jiǎn)單地在項(xiàng)目啟動(dòng)的時(shí)候加上測(cè)試,代碼如下:
更嚴(yán)謹(jǐn)?shù)淖龇?,是?huì)寫(xiě)單元測(cè)試來(lái)測(cè)試流程引擎,為了快一點(diǎn),我們直接在項(xiàng)目啟動(dòng)類上加上了測(cè)試代碼。
運(yùn)行之后的關(guān)鍵結(jié)果如下:
[main] demo.sixth.SynchronizedDemo: SynchronizedDemo init begin [main] demo.sixth.SynchronizedDemo: SynchronizedDemo init end [main] demo.three.flow.FlowCenter : init success,flowMap is {"flow1":{"PARAM_VALID":[{},{}],"AFTER_TRANSACTION":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}],"BUSINESS_VALID":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}],"IN_TRANSACTION":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}]}} o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup [main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http) [main] demo.DemoApplication : Started DemoApplication in 5.377 seconds (JVM running for 6.105) [main] demo.three.flow.BeanOne : BeanOne is run,thread name is main [main] demo.three.flow.BeanOne : BeanOne is run,thread name is main [pool-1-thread-1] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-1 [main] demo.three.flow.BeanOne : BeanOne is run,thread name is main [pool-1-thread-2] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-2 [pool-1-thread-3] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-3 [main] demo.three.flow.BeanOne : BeanOne is run,thread name is main [pool-1-thread-4] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-4
從運(yùn)行結(jié)果中,我們可以看到 BeanTwo 已經(jīng)被多個(gè)不同的線程異步執(zhí)行了。
7、總結(jié)
這是一個(gè)線程池在簡(jiǎn)單流程引擎上的運(yùn)用實(shí)站,雖然這個(gè)流程引擎看起來(lái)比較簡(jiǎn)單,但在實(shí)際工作中,還是非常好用的,大家可以把代碼拉下來(lái),自己嘗試一下,調(diào)試一下參數(shù),比如當(dāng)我新增 SpringBean 的時(shí)候,流程引擎的表現(xiàn)如何。
以上就是Java線程池流程編排運(yùn)用實(shí)戰(zhàn)源碼的詳細(xì)內(nèi)容,更多關(guān)于Java線程池流程編排運(yùn)用的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
IDEA遇到Internal error. Please refer to http://jb. gg/ide/crit
這篇文章主要介紹了IDEA遇到Internal error. Please refer to http://jb. gg/ide/critical-startup-errors的問(wèn)題及解決辦法,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),需要的朋友可以參考下2020-08-08SpringBoot?AOP統(tǒng)一處理Web請(qǐng)求日志的示例代碼
springboot有很多方法處理日志,例如攔截器,aop切面,service中代碼記錄等,下面這篇文章主要給大家介紹了關(guān)于SpringBoot?AOP統(tǒng)一處理Web請(qǐng)求日志的相關(guān)資料,需要的朋友可以參考下2023-02-02Java String轉(zhuǎn)換時(shí)為null的解決方法
這篇文章主要介紹了Java String轉(zhuǎn)換時(shí)為null的解決方法,需要的朋友可以參考下2017-07-07Spring Boot啟動(dòng)banner定制的步驟詳解
這篇文章主要給大家介紹了關(guān)于Spring Boot啟動(dòng)banner定制的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2018-03-03