java并發(fā)使用CountDownLatch在生產(chǎn)環(huán)境翻車剖析
前言
大家好,我是小郭,之前分享了CountDownLatch的使用,我們知道用來控制并發(fā)流程的同步工具,主要的作用是為了等待多個線程同時完成任務(wù)后,在進行主線程任務(wù)。
萬萬沒想到,在生產(chǎn)環(huán)境中竟然翻車了,因為沒有考慮到一些場景,導(dǎo)致了CountDownLatch出現(xiàn)了問題,接下來來分享一下由于CountDownLatch導(dǎo)致的問題。
【線程】并發(fā)流程控制的同步工具-CountDownLatch
需求背景
先簡單介紹下業(yè)務(wù)場景,針對用戶批量下載的文件進行修改上傳

為了提高執(zhí)行的速度,所以在采用線程池去執(zhí)行 下載-修改-上傳 的操作,并在全部執(zhí)行完之后統(tǒng)一提交保存文件地址到數(shù)據(jù)庫,于是加入了CountDownLatch來進行控制。
具體實現(xiàn)
根據(jù)服務(wù)本身情況,自定義一個線程池
public static ExecutorService testExtcutor() {
return new ThreadPoolExecutor(
2,
2,
0L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1));
}
模擬執(zhí)行
public static void main(String[] args) {
// 下載文件總數(shù)
List<Integer> resultList = new ArrayList<>(100);
IntStream.range(0,100).forEach(resultList::add);
// 下載文件分段
List<List<Integer>> split = CollUtil.split(resultList, 10);
ExecutorService executorService = BaseThreadPoolExector.testExtcutor();
CountDownLatch countDownLatch = new CountDownLatch(100);
for (List<Integer> list : split) {
executorService.execute(() -> {
list.forEach(i ->{
try {
// 模擬業(yè)務(wù)操作
Thread.sleep(500);
System.out.println("任務(wù)進入");
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(e.getMessage());
} finally {
System.out.println(countDownLatch.getCount());
countDownLatch.countDown();
}
});
});
}
try {
countDownLatch.await();
System.out.println("countDownLatch.await()");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
一開始我個人感覺沒有什么問題,反正finally都能夠做減一的操作,到最后調(diào)用await方法,進行主線程任務(wù)
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@300ffa5d rejected from java.util.concurrent.ThreadPoolExecutor@1f17ae12[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at Thread.executor.executorTestBlock.main(executorTestBlock.java:28) 任務(wù)進入 countDownLatch.countDown 任務(wù)進入 countDownLatch.countDown 任務(wù)進入 countDownLatch.countDown 任務(wù)進入 countDownLatch.countDown 任務(wù)進入 countDownLatch.countDown 任務(wù)進入 countDownLatch.countDown 任務(wù)進入 countDownLatch.countDown 任務(wù)進入 countDownLatch.countDown 任務(wù)進入 countDownLatch.countDown 任務(wù)進入 countDownLatch.countDown 任務(wù)進入 countDownLatch.countDown 任務(wù)進入 countDownLatch.countDown 任務(wù)進入 countDownLatch.countDown 任務(wù)進入 countDownLatch.countDown 任務(wù)進入 countDownLatch.countDown
由于任務(wù)數(shù)量較多,阻塞隊列中已經(jīng)塞滿了,所以默認的拒絕策略,當隊列滿時,處理策略報錯異常,
要注意這個異常是線程池,自己拋出的,不是我們循環(huán)里面打印出來的,
這也造成了,線上這個線程池被阻塞了,他永遠也調(diào)用不到await方法,
利用jstack,我們就能夠看到有問題
"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007ff6198b7000 nid=0xa903 waiting on condition [0x0000700001c64000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000076b2283f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) "pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00007ff6198b6800 nid=0x5903 waiting on condition [0x0000700001b61000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000076b2283f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
解決方案
調(diào)大阻塞隊列,但是問題來了,到底多少阻塞隊列才是大呢,如果太大了會不由又造成內(nèi)存溢出等其他的問題
在第一個的基礎(chǔ)上,我們修改了拒絕策略,當觸發(fā)拒絕策略的時候,用調(diào)用者所在的線程來執(zhí)行任務(wù)
public static ThreadPoolExecutor queueExecutor(BlockingQueue<Runnable> workQueue){
return new ThreadPoolExecutor(
size,
size,
0L,
TimeUnit.SECONDS,
workQueue,
new ThreadPoolExecutor.CallerRunsPolicy());
}
你可能又會想說,會不會任務(wù)數(shù)量太多,導(dǎo)致調(diào)用者所在的線程執(zhí)行不過來,任務(wù)提交的性能急劇下降
那我們就應(yīng)該自定義拒絕策略,將這下排隊的消息記錄下來,采用補償機制的方式去執(zhí)行
同時也要注意上面的那個異常是線程池拋出來的,我們自己也需要將線程池進行try catch,記錄問題數(shù)據(jù),并且在finally中執(zhí)行countDownLatch.countDown來避免,線程池的使用
總結(jié)
目前根據(jù)業(yè)務(wù)部門的反饋,業(yè)務(wù)實際中任務(wù)數(shù)不很特別多的情況,所以暫時先采用了第二種方式去解決這個線上問題
在這里我們也可以看到,如果沒有正確的關(guān)閉countDownLatch,可能會導(dǎo)致一直等待,這也是我們需要注意的。
工具雖然好,但是依然要注意他帶來的問題,沒有正確的去處理好,引發(fā)的一系列連鎖反應(yīng)。
以上就是java并發(fā)使用CountDownLatch在生產(chǎn)環(huán)境翻車剖析的詳細內(nèi)容,更多關(guān)于java并發(fā)CountDownLatch的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringCloud OpenFeign基本介紹與實現(xiàn)示例
OpenFeign源于Netflix的Feign,是http通信的客戶端。屏蔽了網(wǎng)絡(luò)通信的細節(jié),直接面向接口的方式開發(fā),讓開發(fā)者感知不到網(wǎng)絡(luò)通信細節(jié)。所有遠程調(diào)用,都像調(diào)用本地方法一樣完成2023-02-02
SpringBoot利用Redis解決海量重復(fù)提交問題
本文主要介紹了SpringBoot利用Redis解決海量重復(fù)提交問題,介紹了三種常見的解決方案,包括使用Redis計數(shù)器,使用Redis分布式鎖和使用Redis發(fā)布/訂閱機制,感興趣的可以了解一下2024-03-03

