Java實(shí)現(xiàn)限定時(shí)間CountDownLatch并行場景
業(yè)務(wù)場景:
一個(gè)用戶數(shù)據(jù)接口,要求在20ms內(nèi)返回?cái)?shù)據(jù),它的調(diào)用邏輯復(fù)雜,關(guān)聯(lián)接口多,需要從3個(gè)接口匯總數(shù)據(jù),這些匯總接口最小耗時(shí)也需要16ms,全部匯總接口最優(yōu)狀態(tài)耗時(shí)需要16ms*3=48ms
解決方案:
使用并行調(diào)用接口,通過多線程同時(shí)獲取結(jié)果集,最后進(jìn)行結(jié)果整合。在這種場景下,使用concurrent包的CountDownLatch完成相關(guān)操作。CountDownLatch本質(zhì)上是一個(gè)計(jì)數(shù)器,把它初始化為與執(zhí)行任務(wù)相同的數(shù)量,當(dāng)一個(gè)任務(wù)執(zhí)行完時(shí),就將計(jì)數(shù)器的值減1,直到計(jì)算器達(dá)到0時(shí),表示完成了所有任務(wù),在await上等待線程就繼續(xù)執(zhí)行。
為上述業(yè)務(wù)場景封裝的工具類,傳入兩個(gè)參數(shù):一個(gè)參數(shù)是計(jì)算的task數(shù)量,另外一個(gè)參數(shù)是整個(gè)大任務(wù)超時(shí)的毫秒數(shù)。
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ParallelCollector { private Long timeout; private CountDownLatch countDownLatch; ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 200, 1, TimeUnit.HOURS, new ArrayBlockingQueue<>(100)); public ParallelCollector(int taskSize, Long timeOutMill) { countDownLatch = new CountDownLatch(taskSize); timeout = timeOutMill; } public void submitTask(Runnable runnable) { executor.execute(() -> { runnable.run(); countDownLatch.countDown(); }); } public void await() { try { this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } public void destroy() { this.executor.shutdown(); } }
當(dāng)任務(wù)運(yùn)行時(shí)間超過了任務(wù)的時(shí)間上限,就被直接停止,這就是await()的功能。
interface是一個(gè)模擬遠(yuǎn)程服務(wù)的超時(shí)的測試類,程序運(yùn)行后,會(huì)輸出執(zhí)行結(jié)果到map集合。
public class InterfaceMock { private volatile int num=1; public String slowMethod1() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return String.valueOf(num+1); }; public String slowMethod2() { return String.valueOf(num+1); }; public String slowMethod3() { return String.valueOf(num+1); }; }
并行執(zhí)行獲取結(jié)果測試類
@SpringBootTest class ThreadPoolApplicationTests { @Test void testTask() { InterfaceMock interfaceMock = new InterfaceMock(); ParallelCollector collector = new ParallelCollector(3, 20L); ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); collector.submitTask(()->map.put("method1",interfaceMock.slowMethod1())); collector.submitTask(()->map.put("method2",interfaceMock.slowMethod2())); collector.submitTask(()->map.put("method3",interfaceMock.slowMethod3())); collector.await(); System.out.println(map.toString()); collector.destroy(); } }
當(dāng)method1()執(zhí)行時(shí)間大于20ms,則該方法直接被終止,結(jié)果map集沒有method1()的結(jié)果,結(jié)果如下:
總結(jié)
使用這種方式,接口能在固定時(shí)間內(nèi)返回,注意CountDownLatch定義數(shù)量是任務(wù)個(gè)數(shù),使用concurrentHashMap避免了并行執(zhí)行時(shí)發(fā)生錯(cuò)亂,造成錯(cuò)誤的結(jié)果的問題。
到此這篇關(guān)于Java實(shí)現(xiàn)限定時(shí)間CountDownLatch并行場景的文章就介紹到這了,更多相關(guān)Java CountDownLatch并行場景內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java小知識(shí)之查詢數(shù)據(jù)庫數(shù)據(jù)的元信息
這篇文章主要給大家介紹了關(guān)于java小知識(shí)之查詢數(shù)據(jù)庫數(shù)據(jù)的元信息,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2021-10-10解決@ResponseBody作用在返回類型為String的方法時(shí)的坑
這篇文章主要介紹了解決@ResponseBody作用在返回類型為String的方法時(shí)的坑,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-06-06SpringCloud之分布式配置中心Spring Cloud Config高可用配置實(shí)例代碼
這篇文章主要介紹了SpringCloud之分布式配置中心Spring Cloud Config高可用配置實(shí)例代碼,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-04-04java定時(shí)任務(wù)的實(shí)現(xiàn)方法
java定時(shí)任務(wù)的實(shí)現(xiàn)方法,需要的朋友可以參考一下2013-03-03MyBatis中foreach標(biāo)簽的collection屬性的取值方式
這篇文章主要介紹了MyBatis中foreach標(biāo)簽的collection屬性的取值方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-08-08spring Security的自定義用戶認(rèn)證過程詳解
這篇文章主要介紹了spring Security的自定義用戶認(rèn)證過程詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09