關(guān)于Java如何用好線程池的方法分享(建議收藏)
1. 線程的使用場景
異步任務
簡單來說就是某些不需要同步返回業(yè)務處理結(jié)果的場景,比如:短信、郵件等通知類業(yè)務,評論、點贊等互動性業(yè)務。
并行計算
就像MapReduce一樣,充分利用多線程的并行計算能力,將大任務拆分為多個子任務,最后再將所有子任務計算后的結(jié)果進行匯總,F(xiàn)orkJoinPool就是JDK中典型的并行計算框架。
串行任務
很簡單,假設(shè)某個方法需要經(jīng)過,A、B、C三個步驟,A步驟耗時1秒,B步驟耗時2秒,C步驟耗時3秒,那么如果是串行處理,則該方法最終需要耗時6秒,但如果A、B、C三個步驟互相之間是沒有依賴的,那么就可以利用多線程的方式,同時處理三個步驟,這樣該方法只需要等待耗時最長的步驟結(jié)束即可。
2. 線程池創(chuàng)建
不要直接使用Executors創(chuàng)建線程池,應通過ThreadPoolExecutor的方式,主動明確線程池的參數(shù),避免產(chǎn)生意外。
每個參數(shù)都要顯示設(shè)置,例如像下面這樣:
private static final ExecutorService executor = new ThreadPoolExecutor( 2, 4, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("common-pool-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy());
3. 參數(shù)的配置建議
CorePoolSize(核心線程數(shù))
一般在配置核心線程數(shù)的時候,是需要結(jié)合線程池將要處理任務的特性來決定的,而任務的性質(zhì)一般可以劃分為:CPU密集型、I/O密集型。
比較通用的配置方式如下
CPU密集型:一般建議線程的核心數(shù)與CPU核心數(shù)保持一致。 I/O密集型:一般可以設(shè)置2倍的CPU核心數(shù)的線程數(shù),因為此類任務CPU比較空閑,可以多分配點線程充分利用CPU資源來提高效率。
通過Runtime.getRuntime().availableProcessors()可以獲取核心線程數(shù)。
另外還有一個公式可以借鑒
線程核心數(shù) = cpu核心數(shù) / (1-阻塞系數(shù))
阻塞系數(shù) = 阻塞時間/(阻塞時間+使用CPU的時間)
實際上大多數(shù)線上業(yè)務所消耗的時間主要就是I/O等待,因此一般線程數(shù)都可以設(shè)置的多一點,比如tomcat中默認的線程數(shù)就是200,所以最佳的核心線程數(shù)是需要根據(jù)特定場景,然后通過實際上線上允許結(jié)果分析后,再不斷的進行調(diào)整。
MaximumPoolSize
maximumPoolSize的設(shè)置也是看實際應用場景,如果設(shè)置的和corePoolSize一樣,那就完全依靠阻塞隊列和拒絕策略來控制任務的處理情況,如果設(shè)置的比corePoolSize稍微大一點,那可能對于一些突然流量的場景更使用。
KeepAliveTime
由maximumPoolSize創(chuàng)建出來的線程,在經(jīng)過keepAliveTime時間后進行銷毀,依舊突發(fā)流量持續(xù)的時間來決定。
WorkQueue
那么阻塞隊列應該設(shè)置多大呢?我們知道當線程池中所有的線程都在工作時,如果再有任務進來,就會被放到阻塞隊列中等待,如果阻塞隊列設(shè)置的太小,可能很快隊列就滿了,導致任務被丟棄或者異常(由拒絕策略決定),如果隊列設(shè)置的太大,又可能會帶來內(nèi)存資源的緊張,甚至OOM,以及任務延遲時間過長。
所以阻塞隊列的大小,又是要結(jié)合實際場景來設(shè)置的。
一般會根據(jù)處理任務的速度與任務產(chǎn)生的速度進行計算得到一個大概的數(shù)值。
假設(shè)現(xiàn)在有1個線程,每秒鐘可以處理10個任務,正常情況下每秒鐘產(chǎn)生的任務數(shù)小于10,那么此時隊列長度為10就足以。 但是如果高峰時期,每秒產(chǎn)生的任務數(shù)會達到20,會持續(xù)10秒,且任務又不希望丟棄,那么此時隊列的長度就需要設(shè)置到100。
監(jiān)控workQueue中等待任務的數(shù)量是非常重要的,只有了解實際的情況,才能做出正確的決定。
ThreadFactory
通過threadFactory我們可以自定義線程組的名字,設(shè)置合理的名稱將有利于你線上進行問題排查。
Handler
最后拒絕策略,這也是要結(jié)合實際的業(yè)務場景來決定采用什么樣的拒絕方式,例如像過程類的數(shù)據(jù),可以直接采用DiscardOldestPolicy策略。
常見的拒絕策略
AbortPolicy
JDK自帶線程池中默認的拒絕策略,直接拒絕任務并拋出異常。
CallerRunsPolicy
由當前調(diào)用調(diào)用者繼續(xù)執(zhí)行當前任務。
DiscardPolicy
直接丟棄當前任務
DiscardOldestPolicy
丟棄阻塞隊列中最早丟進去的任務
其他的拒絕策略
NewThreadRunsPolicy
這是Netty中的拒絕策略,和CallerRunsPolicy有點像,任務不會丟棄,不同的是Netty中是新建了一個線程繼續(xù)執(zhí)行當前任務。
AbortPolicyWithReport
dubbo中的拒絕策略,也是拋出異常,不同的時對于日志內(nèi)容的輸出更加豐富,也是為了我們更好的排查問題。
EsAbortPolicy
針對某種特定場景時,做出不同的處理方式,比如在elasticsearch中只有當isForceExecution為true(isForceExecution是用來判定任務是執(zhí)行還是拒絕的條件),且阻塞隊列是SizeBlockingQueue類型時,才會放入當前隊列中,否則拋出異常。
4. 線程池的任務處理流程
阻塞隊列的設(shè)計起到了良好的緩沖作用,當面對突發(fā)流量到來時,先將任務丟到隊列中,再慢慢來消費,其原理和MQ是類似的,一旦隊列也被打滿了,則說明消費能力與你的期望對比,已經(jīng)嚴重不足了,此時maximumPoolSize參數(shù)的設(shè)計,又給了你一次處理的機會,你可以選擇再開啟一部分線程來應對突發(fā)狀況,當危機接觸后,再主動幫你回收這部分線程,或者選擇使用拒絕策略。
一個簡單的任務處理,考慮各種實際運行中可能遇到的情況,對于線程池的使用者來說,也應了解線程池的任務處理流程,再結(jié)合自身的業(yè)務場景充分考慮其中的參數(shù)設(shè)置。
5. 線程的狀態(tài)
Java中對線程的定義有如下幾種狀態(tài):RUNNABLE, BLOCKED, WAITING, TIMED\_WAITING, NEW, TERMINATED
RUNNABLE
可運行的狀態(tài),包含了運行中和準備就緒兩種狀態(tài),也就是說RUNNABLE狀態(tài)下線程并不一定已經(jīng)運行了,可能還在等待CPU資源。
BLOCKED
處于阻塞狀態(tài)下的線程,并且這個阻塞是因為進入了同步代碼塊或者方法,需要等待鎖的釋放。
一旦線上出現(xiàn)blocked狀態(tài)的線程,是需要排查原因的。
WAITING
處于等待狀態(tài)下的線程,例如一個線程調(diào)用了Object.wait()方法,那么這個線程就會等待另一個線程調(diào)用Object.notify()或者Object.notifyAll()。 或者調(diào)用thread.join()的線程等待指定線程的終止。
常見的方法有:Object.wait()、Thread.join()、LockSupport.park()
TIMED_WAITING
與WAITING的區(qū)別就在于TIMED_WAITING是明確帶有具體等待時間的,常見的方法有:Thread.sleep(long)、Object.wait(long)、Thread.join(long)、LockSupport.parkNanos(long)、LockSupport.parkUntil(long)
NEW
創(chuàng)建了一個線程,但還沒調(diào)用start()方法。
TERMINATED
終止狀態(tài),表示線程已經(jīng)執(zhí)行完畢。
6. 線程池的監(jiān)控
線程池自身提供的統(tǒng)計數(shù)據(jù)
public class ThreadPoolMonitor { private final static Logger log = LoggerFactory.getLogger(ThreadPoolMonitor.class); private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("my_thread_pool_%d").build()); public static void main(String[] args) { log.info("Pool Size: " + threadPool.getPoolSize()); log.info("Active Thread Count: " + threadPool.getActiveCount()); log.info("Task Queue Size: " + threadPool.getQueue().size()); log.info("Completed Task Count: " + threadPool.getCompletedTaskCount()); } }
通過micrometer API完成統(tǒng)計,這樣就可以接入Prometheus了
package com.springboot.micrometer.monitor; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.micrometer.core.instrument.Metrics; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; @Component public class ThreadPoolMonitor { private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("my_thread_pool_%d").build(), new ThreadPoolExecutor.DiscardOldestPolicy()); /** * 活躍線程數(shù) */ private AtomicLong activeThreadCount = new AtomicLong(0); /** * 隊列任務數(shù) */ private AtomicLong taskQueueSize = new AtomicLong(0); /** * 完成任務數(shù) */ private AtomicLong completedTaskCount = new AtomicLong(0); /** * 線程池中當前線程的數(shù)量 */ private AtomicLong poolSize = new AtomicLong(0); @PostConstruct private void init() { /** * 通過micrometer API完成統(tǒng)計 * * gauge最典型的使用場景就是統(tǒng)計:list、Map、線程池、連接池等集合類型的數(shù)據(jù) */ Metrics.gauge("my_thread_pool_active_thread_count", activeThreadCount); Metrics.gauge("my_thread_pool_task_queue_size", taskQueueSize); Metrics.gauge("my_thread_pool_completed_task_count", completedTaskCount); Metrics.gauge("my_thread_pool_size", poolSize); // 模擬線程池的使用 new Thread(this::runTask).start(); } private void runTask() { // 每5秒監(jiān)控一次線程池的使用情況 monitorThreadPoolState(); // 模擬任務執(zhí)行 IntStream.rangeClosed(0, 500).forEach(i -> { // 每500毫秒,執(zhí)行一個任務 try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } // 每個處理一個任務耗時5秒 threadPool.submit(() -> { try { TimeUnit.MILLISECONDS.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }); }); } private void monitorThreadPoolState() { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { activeThreadCount.set(threadPool.getActiveCount()); taskQueueSize.set(threadPool.getQueue().size()); poolSize.set(threadPool.getPoolSize()); completedTaskCount.set(threadPool.getCompletedTaskCount()); }, 0, 5, TimeUnit.SECONDS); } }
以上就是關(guān)于Java如何用好線程池的方法分享(建議收藏)的詳細內(nèi)容,更多關(guān)于Java線程池的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring中自帶的@Schedule實現(xiàn)自動任務的過程解析
這篇文章主要介紹了關(guān)于Spring中自帶的@Schedule實現(xiàn)自動任務,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-06-06httpclient 請求http數(shù)據(jù),json轉(zhuǎn)map的實例
下面小編就為大家?guī)硪黄猦ttpclient 請求http數(shù)據(jù),json轉(zhuǎn)map的實例。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-12-12Java mutable對象和immutable對象的區(qū)別說明
這篇文章主要介紹了Java mutable對象和immutable對象的區(qū)別,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06Java利用for循環(huán)輸出空心三角形、空心菱形和空心矩形的代碼
今天小編就為大家分享一篇關(guān)于Java利用for循環(huán)輸出空心三角形、空心菱形和空心矩形的代碼,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12springboot中使用ElasticSearch的詳細教程
這篇文章主要介紹了ElasticSearch在springboot中使用的詳細教程,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-05-05