Java并行執(zhí)行任務(wù)的幾種方案小結(jié)
背景
最近在排查生產(chǎn)環(huán)境問題,發(fā)現(xiàn)商品詳情接口時(shí)不時(shí)會(huì)報(bào)RPC調(diào)用超時(shí),檢查代碼發(fā)現(xiàn)接口里面查詢活動(dòng)耗時(shí)比較長,都是串行執(zhí)行的,仔細(xì)查看發(fā)現(xiàn)完全可以改成并行去執(zhí)行,縮短接口查詢耗時(shí)。
比如我們的商品詳情接口,需要展示立減、階梯滿減、團(tuán)購等活動(dòng)標(biāo)簽。需要查詢?nèi)尾煌幕顒?dòng)信息,再組裝活動(dòng)標(biāo)簽信息。
如果每次查詢耗時(shí)1s,按照串行的方式去調(diào)用,整個(gè)接口下來至少需要3s,整個(gè)耗時(shí),對于我們來講是無法接受的。其實(shí)在jdk中,給我們提供了幾種非常便捷的并行執(zhí)行任務(wù)的方法。
CountDownLatchExecutorService.invokeAll()Fork/Join分而治之 有點(diǎn)類似MapReduce的影子,這個(gè)有興趣的可以自行去了解
改進(jìn)方案
代碼例子:
private void assemblyActivityTag(CartItemDTO itemDTO){
//1.查詢立減活動(dòng)信息,耗時(shí)1s
//2.查詢階梯滿減活動(dòng)信息,耗時(shí)1s
//3.查詢團(tuán)購活動(dòng)信息,耗時(shí)1s
//4.組裝活動(dòng)標(biāo)簽信息,耗時(shí)1s
// 串行執(zhí)行下來整個(gè)耗時(shí)4s
}
CountDownLatch
private void assemblyActivityTag(CartItemDTO itemDTO){
ExecutorService executorService = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(3);
executorService.execute(new Runnable() {
@Override
public void run() {
//1.查詢立減活動(dòng)信息
latch.countDown();
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
//2.查詢階梯滿減活動(dòng)信息
latch.countDown();
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
//3.查詢團(tuán)購活動(dòng)信息
latch.countDown();
}
});
try {
// 一定記得加上timeout時(shí)間,防止阻塞主線程
latch.await(3000,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
//4.等待所有子任務(wù)完成,組裝活動(dòng)標(biāo)簽信息
//5.關(guān)閉線程池
executorService.shutdown();
}
ExecutorService.invokeAll()
private void assemblyActivityTag(CartItemDTO itemDTO) {
ExecutorService executorService = Executors.newCachedThreadPool();
List<Callable<String>> tasks = Lists.newArrayList();
tasks.add(new Callable<String>() {
@Override
public String call() throws Exception {
//1.查詢立減活動(dòng)信息
return null;
}
});
tasks.add(new Callable<String>() {
@Override
public String call() throws Exception {
//2.查詢階梯滿減活動(dòng)信息
return null;
}
});
tasks.add(new Callable<String>() {
@Override
public String call() throws Exception {
//3.查詢團(tuán)購活動(dòng)信息
return null;
}
});
try {
List<Future<String>> futureList = executorService.invokeAll(tasks, 3000, TimeUnit.MILLISECONDS);
for (Future<String> future : futureList) {
// 獲取線程執(zhí)行結(jié)果
try {
String activityTag = future.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
//4.組裝活動(dòng)標(biāo)簽信息
//5.關(guān)閉線程池
executorService.shutdown();
}
注意點(diǎn)和區(qū)別
在使用CountDownLatch,盡可能使用線程安全的容器去處理子線程的返回值,避免多線程情況下,出現(xiàn)臟數(shù)據(jù)。
如果想知道每個(gè)子線程的對應(yīng)的返回值,ExecutorService.invokeAll()方式,是沒法區(qū)分的,只能依賴返回值的順序去匹配。
使用上面2種方式時(shí),切記設(shè)置超時(shí)時(shí)間,防止子任務(wù)執(zhí)行時(shí)間過長,阻塞主線程任務(wù)
線程池用完結(jié)束,記得shutdown()
java并行執(zhí)行任務(wù)demo
在一個(gè)方法中同時(shí)調(diào)用多個(gè)方法或者服務(wù),并等待所有結(jié)果返回
package com.test.demo;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
public class TestFuture {
@Test
public void testA(){
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> c());
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> a());
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> b());
try {
//獲取并行執(zhí)行任務(wù)結(jié)果
System.out.println(future3.get());
System.out.println(future1.get());
System.out.println(future2.get());
}catch (Exception e){
}
}
public String a(){
try {
Thread.sleep(1000);
}catch (Exception e){
}
return "a";
}
private String b(){
try {
//模擬業(yè)務(wù)執(zhí)行時(shí)間
Thread.sleep(2000);
}catch (Exception e){
}
return "b";
}
private String c(){
try {
//模擬業(yè)務(wù)執(zhí)行時(shí)間
Thread.sleep(5000);
}catch (Exception e){
}
return "c";
}
}
測試結(jié)果:

從執(zhí)行結(jié)果中可以看到一共耗時(shí)5s,如果同步進(jìn)行執(zhí)行,耗時(shí)應(yīng)該在8s
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
如何使用Idea中的 Deployment 實(shí)現(xiàn)打包自動(dòng)部署
這篇文章主要介紹了使用Idea中的 Deployment 實(shí)現(xiàn)打包自動(dòng)部署,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08
解決idea報(bào)錯(cuò) Connot resolve column 的問題
這篇文章主要介紹了解決idea報(bào)錯(cuò) Connot resolve column 的問題,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02
Java泛型T,E,K,V,N,?與Object區(qū)別和含義
Java?泛型(generics)是?JDK?5?中引入的一個(gè)新特性,?泛型提供了編譯時(shí)類型安全檢測機(jī)制,該機(jī)制允許程序員在編譯時(shí)檢測到非法的類型。本文將詳細(xì)講講Java泛型T、E、K、V、N、?和Object區(qū)別和含義,需要發(fā)可以參考一下2022-03-03
java從命令行獲取數(shù)據(jù)的三種方式代碼實(shí)例
這篇文章主要介紹了java從命令行獲取數(shù)據(jù)的三種方式代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
基于Java開發(fā)實(shí)現(xiàn)ATM系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了基于Java開發(fā)實(shí)現(xiàn)ATM系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-08-08

