java并發(fā)使用CountDownLatch在生產(chǎn)環(huán)境翻車剖析
前言
大家好,我是小郭,之前分享了CountDownLatch的使用,我們知道用來控制并發(fā)流程的同步工具,主要的作用是為了等待多個(gè)線程同時(shí)完成任務(wù)后,在進(jìn)行主線程任務(wù)。
萬萬沒想到,在生產(chǎn)環(huán)境中竟然翻車了,因?yàn)闆]有考慮到一些場(chǎng)景,導(dǎo)致了CountDownLatch出現(xiàn)了問題,接下來來分享一下由于CountDownLatch導(dǎo)致的問題。
【線程】并發(fā)流程控制的同步工具-CountDownLatch
需求背景
先簡(jiǎn)單介紹下業(yè)務(wù)場(chǎng)景,針對(duì)用戶批量下載的文件進(jìn)行修改上傳
為了提高執(zhí)行的速度,所以在采用線程池去執(zhí)行 下載-修改-上傳 的操作,并在全部執(zhí)行完之后統(tǒng)一提交保存文件地址到數(shù)據(jù)庫,于是加入了CountDownLatch來進(jìn)行控制。
具體實(shí)現(xiàn)
根據(jù)服務(wù)本身情況,自定義一個(gè)線程池
public static ExecutorService testExtcutor() { return new ThreadPoolExecutor( 2, 2, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1)); }
模擬執(zhí)行
public static void main(String[] args) { // 下載文件總數(shù) List<Integer> resultList = new ArrayList<>(100); IntStream.range(0,100).forEach(resultList::add); // 下載文件分段 List<List<Integer>> split = CollUtil.split(resultList, 10); ExecutorService executorService = BaseThreadPoolExector.testExtcutor(); CountDownLatch countDownLatch = new CountDownLatch(100); for (List<Integer> list : split) { executorService.execute(() -> { list.forEach(i ->{ try { // 模擬業(yè)務(wù)操作 Thread.sleep(500); System.out.println("任務(wù)進(jìn)入"); } catch (InterruptedException e) { e.printStackTrace(); System.out.println(e.getMessage()); } finally { System.out.println(countDownLatch.getCount()); countDownLatch.countDown(); } }); }); } try { countDownLatch.await(); System.out.println("countDownLatch.await()"); } catch (InterruptedException e) { e.printStackTrace(); } }
一開始我個(gè)人感覺沒有什么問題,反正finally都能夠做減一的操作,到最后調(diào)用await方法,進(jìn)行主線程任務(wù)
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@300ffa5d rejected from java.util.concurrent.ThreadPoolExecutor@1f17ae12[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at Thread.executor.executorTestBlock.main(executorTestBlock.java:28) 任務(wù)進(jìn)入 countDownLatch.countDown 任務(wù)進(jìn)入 countDownLatch.countDown 任務(wù)進(jìn)入 countDownLatch.countDown 任務(wù)進(jìn)入 countDownLatch.countDown 任務(wù)進(jìn)入 countDownLatch.countDown 任務(wù)進(jìn)入 countDownLatch.countDown 任務(wù)進(jìn)入 countDownLatch.countDown 任務(wù)進(jìn)入 countDownLatch.countDown 任務(wù)進(jìn)入 countDownLatch.countDown 任務(wù)進(jìn)入 countDownLatch.countDown 任務(wù)進(jìn)入 countDownLatch.countDown 任務(wù)進(jìn)入 countDownLatch.countDown 任務(wù)進(jìn)入 countDownLatch.countDown 任務(wù)進(jìn)入 countDownLatch.countDown 任務(wù)進(jìn)入 countDownLatch.countDown
由于任務(wù)數(shù)量較多,阻塞隊(duì)列中已經(jīng)塞滿了,所以默認(rèn)的拒絕策略,當(dāng)隊(duì)列滿時(shí),處理策略報(bào)錯(cuò)異常,
要注意這個(gè)異常是線程池,自己拋出的,不是我們循環(huán)里面打印出來的,
這也造成了,線上這個(gè)線程池被阻塞了,他永遠(yuǎn)也調(diào)用不到await方法,
利用jstack,我們就能夠看到有問題
"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007ff6198b7000 nid=0xa903 waiting on condition [0x0000700001c64000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000076b2283f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) "pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00007ff6198b6800 nid=0x5903 waiting on condition [0x0000700001b61000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000076b2283f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
解決方案
調(diào)大阻塞隊(duì)列,但是問題來了,到底多少阻塞隊(duì)列才是大呢,如果太大了會(huì)不由又造成內(nèi)存溢出等其他的問題
在第一個(gè)的基礎(chǔ)上,我們修改了拒絕策略,當(dāng)觸發(fā)拒絕策略的時(shí)候,用調(diào)用者所在的線程來執(zhí)行任務(wù)
public static ThreadPoolExecutor queueExecutor(BlockingQueue<Runnable> workQueue){ return new ThreadPoolExecutor( size, size, 0L, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.CallerRunsPolicy()); }
你可能又會(huì)想說,會(huì)不會(huì)任務(wù)數(shù)量太多,導(dǎo)致調(diào)用者所在的線程執(zhí)行不過來,任務(wù)提交的性能急劇下降
那我們就應(yīng)該自定義拒絕策略,將這下排隊(duì)的消息記錄下來,采用補(bǔ)償機(jī)制的方式去執(zhí)行
同時(shí)也要注意上面的那個(gè)異常是線程池拋出來的,我們自己也需要將線程池進(jìn)行try catch,記錄問題數(shù)據(jù),并且在finally中執(zhí)行countDownLatch.countDown來避免,線程池的使用
總結(jié)
目前根據(jù)業(yè)務(wù)部門的反饋,業(yè)務(wù)實(shí)際中任務(wù)數(shù)不很特別多的情況,所以暫時(shí)先采用了第二種方式去解決這個(gè)線上問題
在這里我們也可以看到,如果沒有正確的關(guān)閉countDownLatch,可能會(huì)導(dǎo)致一直等待,這也是我們需要注意的。
工具雖然好,但是依然要注意他帶來的問題,沒有正確的去處理好,引發(fā)的一系列連鎖反應(yīng)。
以上就是java并發(fā)使用CountDownLatch在生產(chǎn)環(huán)境翻車剖析的詳細(xì)內(nèi)容,更多關(guān)于java并發(fā)CountDownLatch的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java藍(lán)橋杯實(shí)現(xiàn)線段和點(diǎn)
本文主要介紹Java藍(lán)橋杯實(shí)現(xiàn)線段和點(diǎn)的內(nèi)容,感興趣的小伙伴可以參考下文2021-08-08SpringCloud OpenFeign基本介紹與實(shí)現(xiàn)示例
OpenFeign源于Netflix的Feign,是http通信的客戶端。屏蔽了網(wǎng)絡(luò)通信的細(xì)節(jié),直接面向接口的方式開發(fā),讓開發(fā)者感知不到網(wǎng)絡(luò)通信細(xì)節(jié)。所有遠(yuǎn)程調(diào)用,都像調(diào)用本地方法一樣完成2023-02-02SpringBoot集成cache緩存的實(shí)現(xiàn)
日常開發(fā)中,緩存是解決數(shù)據(jù)庫壓力的一種方案,本文記錄springboot中使用cache緩存。需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-06-06java實(shí)現(xiàn)水果超市管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)水果超市管理系統(tǒng),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01SpringBoot利用Redis解決海量重復(fù)提交問題
本文主要介紹了SpringBoot利用Redis解決海量重復(fù)提交問題,介紹了三種常見的解決方案,包括使用Redis計(jì)數(shù)器,使用Redis分布式鎖和使用Redis發(fā)布/訂閱機(jī)制,感興趣的可以了解一下2024-03-03