JAVA線程池監(jiān)控以及動態(tài)調(diào)整示例詳解
1 背景
Java線程池源碼分析 里雖然介紹了線程池的核心配置(核心線程數(shù)、最大線程數(shù)和隊列大小)該如何配置,但是實際上業(yè)界也沒有一個統(tǒng)一的標準。雖然有些所謂的"公式",但是不同的業(yè)務場景復雜多變,配置原則也不盡相同。從實際經(jīng)驗來看,IO密集型、CPU密集型應用在線程配置上就比較懸殊,因此沒有一個通用的適合所有場景的公式。
那么我們換一種思路,就是既然不能明確配置,那么能不能支持動態(tài)配置呢?答案是肯定的,因為線程池本身就支持核心線程數(shù)和最大線程數(shù)的修改,而且是實時生效的。 通常在生產(chǎn)環(huán)境中,我們可以實時監(jiān)控線程池的運行狀態(tài),隨時掌握應用服務的性能狀況,以便在系統(tǒng)資源緊張時及時告警,動態(tài)調(diào)整線程配置,必要時進行人工介入,排查問題,線上修復。
也就是說,通過實時監(jiān)控,然后動態(tài)修改。
2 監(jiān)控
我們知道,線程池使用不當也會使服務器資源枯竭,導致異常情況的發(fā)生,比如固定線程池的阻塞隊列任務數(shù)量過多、緩存線程池創(chuàng)建的線程過多導致內(nèi)存溢出、系統(tǒng)假死等問題。因此,我們需要一種簡單的監(jiān)控方案來監(jiān)控線程池的使用情況,比如完成任務數(shù)量、未完成任務數(shù)量、線程大小等信息。
線程池的監(jiān)控分為2種類型,一種是在執(zhí)行任務前后全量統(tǒng)計任務排隊時間和執(zhí)行時間,另外一種是通過定時任務,定時獲取活躍線程數(shù),隊列中的任務數(shù),核心線程數(shù),最大線程數(shù)等數(shù)據(jù)。
2.1 MonitoredThreadPoolStatisticsExecutor全量統(tǒng)計
參數(shù)名稱 | 說明 |
---|---|
poolName | 線程池的名稱 |
timeout | 預設的任務超時時間閾值 |
taskTimeoutFlag | 是否記錄任務超時次數(shù) |
execTimeout | 任務執(zhí)行超時時間閾值 |
taskExecTimeoutFlag | 是否記錄任務執(zhí)行超時次數(shù) |
waitInQueueTimeout | 任務在隊列中等待的時間閾值 |
taskWaitInQueueTimeoutFlag | 是否記錄任務等待時間超時次數(shù) |
queueSizeWarningPercent | 任務隊列使用率告警閾值 |
queueSizeWarningFlag | 是否進行隊列容量告警 |
queueSizeHasWarningFlag | 是否需要隊列容量告警(隊列是否曾經(jīng)達到過預警值) |
taskTotalTime | 任務總時長,以任務提交時間進行計時,單位 ms |
taskTotalExecTime | 任務總執(zhí)行時長,以任務開始執(zhí)行進行計時,單位 ms |
minTaskTime | 最短任務時長,以提交時間計時,單位 ms |
maxTaskTime | 最長任務時長,以提交時間計時,單位 ms |
taskTimeoutCount | 任務超時次數(shù),以任務提交進行計時 |
taskExecTimeoutCount | 任務執(zhí)行超時次數(shù),以任務開始執(zhí)行時間進行計時 |
taskWaitInQueueTimeoutCount | 任務等待時間超過設定的閾值的次數(shù) |
minTaskExecTime | 最短任務時長,以執(zhí)行時間計時,單位 ms |
maxTaskExecTime | 最長任務時長,以執(zhí)行時間計時,單位 ms |
activeCount | 線程池中正在執(zhí)行任務的線程數(shù)量 |
completedTaskCount | 線程池已完成的任務數(shù)量,該值小于等于taskCount |
corePoolSize | 線程池的核心線程數(shù)量 |
largestPoolSize | 線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個數(shù)據(jù)可以知道線程池是否滿過,也就是達到了maximumPoolSize |
maximumPoolSize | 線程池的最大線程數(shù)量 |
poolSize | 線程池當前的線程數(shù)量 |
taskCount | 線程池已經(jīng)執(zhí)行的和未執(zhí)行的任務總數(shù) |
為了簡化,代碼中的監(jiān)控數(shù)據(jù)都是通過日志打印,實際中是通過kafka收集,然后做出可視化監(jiān)控。
/** * 自定義可監(jiān)控的線程池 */ public class MonitoredThreadPoolStatisticsExecutor extends ThreadPoolExecutor implements DisposableBean { /** * 線程池的名稱 */ private String poolName; /** * 預設的任務超時時間閾值,用于統(tǒng)計功能。 * 以任務提交時間進行計時,單位 ms,大于0則記錄超時次數(shù)。 */ private long timeout = 120000l; /** * 是否記錄任務超時次數(shù) */ private boolean taskTimeoutFlag = false; /** * 任務執(zhí)行超時時間閾值,用于統(tǒng)計功能。 * 以任務開始執(zhí)行進行計時,單位 ms,大于 0 則記錄任務執(zhí)行超時次數(shù)。 */ private long execTimeout = 120000l; /** * 是否記錄任務執(zhí)行超時次數(shù) */ private boolean taskExecTimeoutFlag = false; /** * 任務在隊列中等待的時間閾值,用于統(tǒng)計功能。 * 以任務提交時間開始計時到開始執(zhí)行為止,單位 ms。 */ private long waitInQueueTimeout = 60000l; /** * 是否記錄任務等待時間超時次數(shù) */ private boolean taskWaitInQueueTimeoutFlag = false; /** * 任務隊列使用率告警閾值 */ private int queueSizeWarningPercent = 80; /** * 是否進行隊列容量告警 */ private boolean queueSizeWarningFlag = false; /** * 是否需要隊列容量告警(隊列是否曾經(jīng)達到過預警值) */ private AtomicBoolean queueSizeHasWarningFlag = new AtomicBoolean(false); /** * 任務總時長,用于統(tǒng)計功能。以任務提交時間進行計時,單位 ms */ private AtomicLong taskTotalTime = new AtomicLong(0); /** * 任務總執(zhí)行時長,用于統(tǒng)計功能。以任務開始執(zhí)行進行計時,單位 ms */ private AtomicLong taskTotalExecTime = new AtomicLong(0); /** * 最短任務時長,以提交時間計時,單位 ms */ private long minTaskTime = Long.MAX_VALUE; /** * 最長任務時長,以提交時間計時,單位 ms */ private long maxTaskTime = 0; /** * 任務超時次數(shù),以任務提交進行計時 */ private AtomicLong taskTimeoutCount = new AtomicLong(0); /** * 任務執(zhí)行超時次數(shù),以任務開始執(zhí)行時間進行計時 */ private AtomicLong taskExecTimeoutCount = new AtomicLong(0); /** * 任務等待時間超過設定的閾值的次數(shù) */ private AtomicLong taskWaitInQueueTimeoutCount = new AtomicLong(0); /** * 最短任務時長,以執(zhí)行時間計時,單位 ms */ private long minTaskExecTime = Long.MAX_VALUE; /** * 最長任務時長,以執(zhí)行時間計時,單位 ms */ private long maxTaskExecTime = 0; /** * 保存任務信息 */ private Map<String, TaskStatistics> taskInfoMap = new ConcurrentHashMap<String, TaskStatistics>(); public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName), handler); this.poolName = poolName; this.timeout = timeout; this.execTimeout = execTimeout; this.waitInQueueTimeout = waitInQueueTimeout; this.queueSizeWarningPercent = queueSizeWarningPercent; if (this.timeout > 0) { this.taskTimeoutFlag = true; } if (this.execTimeout > 0) { this.taskExecTimeoutFlag = true; } if (this.waitInQueueTimeout > 0) { this.taskWaitInQueueTimeoutFlag = true; } if (this.queueSizeWarningPercent > 0) { this.queueSizeWarningFlag = true; } ThreadPoolMonitor.monitor(this); } public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName)); this.poolName = poolName; this.timeout = timeout; this.execTimeout = execTimeout; this.waitInQueueTimeout = waitInQueueTimeout; this.queueSizeWarningPercent = queueSizeWarningPercent; if (this.timeout > 0) { this.taskTimeoutFlag = true; } if (this.execTimeout > 0) { this.taskExecTimeoutFlag = true; } if (this.waitInQueueTimeout > 0) { this.taskWaitInQueueTimeoutFlag = true; } if (this.queueSizeWarningPercent > 0) { this.queueSizeWarningFlag = true; } ThreadPoolMonitor.monitor(this); } @Override public void execute(Runnable command) { this.taskInfoMap.put(String.valueOf(command.hashCode()), new TaskStatistics()); if (this.queueSizeWarningFlag) { float f = (float) getQueue().size() / (getQueue().size() + getQueue().remainingCapacity()); BigDecimal bd = new BigDecimal(f).setScale(2, BigDecimal.ROUND_HALF_UP); int usedPercent = bd.multiply(new BigDecimal(100)).intValue(); if (usedPercent > this.queueSizeWarningPercent) { this.queueSizeHasWarningFlag.set(true); System.out.println("queueSize percent Warning!used:" + usedPercent + "%,qSize:" + getQueue().size() + ",remaining:" + getQueue().remainingCapacity()); } } super.execute(command); } @Override protected void beforeExecute(Thread t, Runnable r) { TaskStatistics taskStatistics = this.taskInfoMap.get(String.valueOf(r.hashCode())); if (null != taskStatistics) { taskStatistics.setStartExecTime(System.currentTimeMillis()); } super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { //重寫此方法做一些統(tǒng)計功能 long endTime = System.currentTimeMillis(); TaskStatistics taskStatistics = this.taskInfoMap.remove(String.valueOf(r.hashCode())); if (null != taskStatistics) { long taskTotalTime = endTime - taskStatistics.getCommitTime(); long taskExecTime = endTime - taskStatistics.getStartExecTime(); long taskWaitInQueueTime = taskStatistics.getStartExecTime() - taskStatistics.getCommitTime(); this.taskTotalTime.addAndGet(taskTotalTime); this.taskTotalExecTime.addAndGet(taskExecTime); if (this.minTaskTime > taskTotalTime) { this.minTaskTime = taskTotalTime; } if (this.maxTaskTime < taskTotalTime) { this.maxTaskTime = taskTotalTime; } if (this.taskTimeoutFlag && taskTotalTime > this.timeout) { this.taskTimeoutCount.incrementAndGet(); } if (this.minTaskExecTime > taskExecTime) { this.minTaskExecTime = taskExecTime; } if (this.maxTaskExecTime < taskExecTime) { this.maxTaskExecTime = taskExecTime; } if (this.taskExecTimeoutFlag && taskExecTime > this.execTimeout) { this.taskExecTimeoutCount.incrementAndGet(); } if (this.taskWaitInQueueTimeoutFlag && taskWaitInQueueTime > this.waitInQueueTimeout) { this.taskWaitInQueueTimeoutCount.incrementAndGet(); } System.out.println("task cost info[ taskTotalTime:" + taskTotalTime + ",taskExecTime:" + taskExecTime + ",taskWaitInQueueTime:" + taskWaitInQueueTime + " ]"); // 初始線程數(shù)、核心線程數(shù)、正在執(zhí)行的任務數(shù)量、 // 已完成任務數(shù)量、任務總數(shù)、隊列里緩存的任務數(shù)量、池中存在的最大線程數(shù)、 // 最大允許的線程數(shù)、線程空閑時間、線程池是否關閉、線程池是否終止 LOGGER.info("{}-pool-monitor: " + " PoolSize: {}, CorePoolSize: {}, Active: {}, " + "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " + "MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}", this.poolName, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(), this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated()); } super.afterExecute(r, t); } /** * Spring容器管理線程池的生命周期,線程池Bean銷毀之前先關閉 * @throws Exception */ @Override public void destroy() throws Exception { shutdown(); } /** * 線程池延遲關閉時(等待線程池里的任務都執(zhí)行完畢),統(tǒng)計線程池情況 */ @Override public void shutdown() { // 統(tǒng)計已執(zhí)行任務、正在執(zhí)行任務、未執(zhí)行任務數(shù)量 LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); super.shutdown(); } /** * 線程池立即關閉時,統(tǒng)計線程池情況 */ @Override public List<Runnable> shutdownNow() { // 統(tǒng)計已執(zhí)行任務、正在執(zhí)行任務、未執(zhí)行任務數(shù)量 LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); return super.shutdownNow(); } /** * 任務平均時長,無已完成任務時,返回 0 */ public long getTaskAvgTime() { if (this.getCompletedTaskCount() > 0) { return this.getTaskTotalTime().get() / this.getCompletedTaskCount(); } return 0; } /** * 任務平均執(zhí)行時長,無已完成任務時,返回 0 */ public long getTaskAvgExecTime() { if (this.getCompletedTaskCount() > 0) { return this.getTaskTotalExecTime().get() / this.getCompletedTaskCount(); } return 0; } //省略setter/getter方法 }
public class TaskStatistics { /** * 任務提交時間 */ private long commitTime; /** * 任務開始執(zhí)行時間 */ private long startExecTime; public TaskStatistics() { this.commitTime = System.currentTimeMillis(); } }
方法 | 含義 |
---|---|
shutdown() | 線程池延遲關閉時(等待線程池里的任務都執(zhí)行完畢),統(tǒng)計已執(zhí)行任務、正在執(zhí)行任務、未執(zhí)行任務數(shù)量 |
shutdownNow() | 線程池立即關閉時,統(tǒng)計已執(zhí)行任務、正在執(zhí)行任務、未執(zhí)行任務數(shù)量 |
beforeExecute(Thread t, Runnable r) | 任務執(zhí)行之前,記錄任務開始時間,startTimes這個HashMap以任務的hashCode為key,開始時間為值 |
afterExecute(Runnable r, Throwable t) | 任務執(zhí)行之后,計算任務結束時間。統(tǒng)計任務耗時、初始線程數(shù)、核心線程數(shù)、正在執(zhí)行的任務數(shù)量、已完成任務數(shù)量、任務總數(shù)、隊列里緩存的任務數(shù)量、池中存在的最大線程數(shù)、最大允許的線程數(shù)、線程空閑時間、線程池是否關閉、線程池是否終止信息 |
注意事項:
- 在 afterExecute 方法中需要注意,需要調(diào)用 ConcurrentHashMap 的 remove 方法移除并返回任務的開始時間信息,而不是調(diào)用 get 方法,因為在高并發(fā)情況下,線程池里要執(zhí)行的任務很多,如果只獲取值不移除的話,會使 ConcurrentHashMap 越來越大,引發(fā)內(nèi)存泄漏或溢出問題。
2.2 定時采集
public class ThreadPoolMonitor { private static final Map<String, FutureWrapper> POOL_TASK_FUTURE_MAP = new ConcurrentHashMap<>(); private static final ScheduledThreadPoolExecutor SCHEDULE_THREAD_POOL = new ScheduledThreadPoolExecutor(8, new NamedThreadFactory("ThreadPoolMonitor")); private static final Long DEFAULT_MONITOR_PERIOD_TIME_MILLS = 1000L; public ThreadPoolMonitor() { } public static void monitor(String name, ThreadPoolExecutor threadPoolExecutor) { if (threadPoolExecutor instanceof MonitoredThreadPoolStatisticsExecutor) { throw new IllegalArgumentException("MonitoredThreadPoolStatisticsExecutor is already monitored."); } else { monitor0(name, threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS); } } public static void remove(String name) { ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.remove(name); if (futureWrapper != null) { futureWrapper.future.cancel(false); } } public static void remove(String name, ThreadPoolExecutor threadPoolExecutor) { ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.get(name); if (futureWrapper != null && futureWrapper.threadPoolExecutor == threadPoolExecutor) { POOL_TASK_FUTURE_MAP.remove(name, futureWrapper); futureWrapper.future.cancel(false); } } static void monitor(MonitoredThreadPoolStatisticsExecutor threadPoolExecutor) { monitor0(threadPoolExecutor.poolName(), threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS); } private static void monitor0(String name, ThreadPoolExecutor threadPoolExecutor, long monitorPeriodTimeMills) { PoolMonitorTask poolMonitorTask = new PoolMonitorTask(threadPoolExecutor, name); POOL_TASK_FUTURE_MAP.compute(name, (k, v) -> { if (v == null) { return new ThreadPoolMonitor.FutureWrapper(SCHEDULE_THREAD_POOL.scheduleWithFixedDelay(poolMonitorTask, 0L, monitorPeriodTimeMills, TimeUnit.MILLISECONDS), threadPoolExecutor); } else { throw new IllegalStateException("duplicate pool name: " + name); } }); } static { Runtime.getRuntime().addShutdownHook(new Thread(ThreadPoolMonitor.SCHEDULE_THREAD_POOL::shutdown)); } static class FutureWrapper { private final Future<?> future; private final ThreadPoolExecutor threadPoolExecutor; public FutureWrapper(Future<?> future, ThreadPoolExecutor threadPoolExecutor) { this.future = future; this.threadPoolExecutor = threadPoolExecutor; } } }
public class PoolMonitorTask implements Runnable { private final ThreadPoolExecutor monitoredThreadPool; private final String poolName; private volatile long lastTaskCount = 0L; public PoolMonitorTask(ThreadPoolExecutor monitoredThreadPool, String poolName) { this.monitoredThreadPool = monitoredThreadPool; this.poolName = poolName; } @Override public void run() { int activeCount = this.monitoredThreadPool.getActiveCount(); int corePoolSize = this.monitoredThreadPool.getCorePoolSize(); int maximumPoolSize = this.monitoredThreadPool.getMaximumPoolSize(); int queueTaskSize = this.monitoredThreadPool.getQueue().size(); long taskCount = this.monitoredThreadPool.getTaskCount(); int executedTask = (int) (taskCount - this.lastTaskCount); log.info("線程池名稱 = {}, 活躍線程數(shù)峰值 = {}, 隊列任務數(shù)峰值 = {}, 核心線程數(shù) = {}, 最大線程數(shù) = {}, 執(zhí)行的任務總數(shù) = {}", this.poolName, activeCount, queueTaskSize, corePoolSize, maximumPoolSize, executedTask); this.lastTaskCount = taskCount; if (this.monitoredThreadPool.isTerminated()) { ThreadPoolMonitor.remove(this.poolName, this.monitoredThreadPool); } } }
2.3 可視化
通過Kafka獲取到監(jiān)控數(shù)據(jù)后,可以做一個可視化頁面,比如可以展示下面這這些數(shù)據(jù)
active/coreSize :活動線程數(shù)和核心線程數(shù)的比值, 其中active = executor.getActiveCount(),表示所有運行中的工作線程的數(shù)量,這個比值反應線程池的線程活躍狀態(tài),如果一直維持在一個很低的水平,則說明線程池需要進行縮容;如果長時間維持一個很大的數(shù)值,說明活躍度好,線程池利用率高。
active/maxSize :活動線程數(shù)和最大線程數(shù)的比值,這個值可以配合上面的 active/coreSize 來看,當active/coreSize大于100%的時候,如果active/maxSize維持在一個較低的值,則說明當前線程池的負載偏低,如果大于60%或者更高,則說明線程池過載,需要及時調(diào)整線程池容量配置。
completedTaskCount:執(zhí)行完畢的工作線程的總數(shù),包含歷史所有。
largestPoolSize:歷史上線程池容量觸達過的最大值
rejectCount:被拒絕的線程的數(shù)量,如果大量線程被拒絕,則說明當前線程池已經(jīng)溢出了,需要及時調(diào)整線程池配置
queueSize:隊列中工作線程的數(shù)量,如果大量的線程池在排隊,說明coreSize已經(jīng)不夠用了,可以根據(jù)實際情況來調(diào)整,對于執(zhí)行時間要求很嚴格的業(yè)務場景,可能需要通過提升coreSize來減少排隊情況。
3 動態(tài)調(diào)整線程池
配置線程池的大小可根據(jù)以下幾個維度進行分析來配置合理的線程數(shù):
任務性質可分為:CPU密集型任務,IO密集型任務,混合型任務,任務的執(zhí)行時長,任務是否有依賴——依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接等。
1、CPU密集型任務 盡量使用較小的線程池,一般為CPU核數(shù)+1。 因為CPU密集型任務使得CPU使用率很高,若開過多的線程數(shù),只能增加上下文切換的次數(shù),因此會帶來額外的開銷。
2、IO密集型任務 可以使用稍大的線程池,一般為2*CPU核數(shù)+1。 因為IO操作不占用CPU,不要讓CPU閑下來,應加大線程數(shù)量,因此可以讓CPU在等待IO的時候去處理別的任務,充分利用CPU時間。
3、混合型任務 可以將任務分成IO密集型和CPU密集型任務,然后分別用不同的線程池去處理。 只要分完之后兩個任務的執(zhí)行時間相差不大,那么就會比串行執(zhí)行來的高效。 因為如果劃分之后兩個任務執(zhí)行時間相差甚遠,那么先執(zhí)行完的任務就要等后執(zhí)行完的任務,最終的時間仍然取決于后執(zhí)行完的任務,而且還要加上任務拆分與合并的開銷,得不償失
4、依賴其他資源 如某個任務依賴數(shù)據(jù)庫的連接返回的結果,這時候等待的時間越長,則CPU空閑的時間越長,那么線程數(shù)量應設置得越大,才能更好的利用CPU。
小結:線程等待時間所占比例越高,需要越多線程。線程CPU時間所占比例越高,需要越少線程。
但是實踐發(fā)現(xiàn),盡管我們經(jīng)過謹慎的評估,仍然不能夠保證一次計算出來合適的參數(shù),那么我們是否可以將修改線程池參數(shù)的成本降下來,這樣至少可以發(fā)生故障的時候可以快速調(diào)整從而縮短故障恢復的時間呢? 基于這個思考,我們是否可以將線程池的參數(shù)從代碼中遷移到分布式配置中心上,實現(xiàn)線程池參數(shù)可動態(tài)配置和即時生效,線程池參數(shù)動態(tài)化前后的參數(shù)修改流程對比如下:
3.1 修改參數(shù)
實際應用中主要有下列參數(shù)可以支持動態(tài)修改。
線程池參數(shù) | 說明 |
---|---|
corePoolSize | 核心線程數(shù) |
maximumPoolSize | 最大線程數(shù) |
queueCapacity | 等待隊列大小 |
timeout | 任務超時時間告警閾值 |
execTimeout | 任務執(zhí)行超時時間告警閾值 |
queuedTaskWarningSize | 等待隊列排隊數(shù)量告警閾值 |
checkInterval | 線程池定時監(jiān)控時間間隔 |
autoExtend | 是否自動擴容 |
其中的corePoolSize、maximumPoolSize都可以使用ThreadPoolExecutor提供的api實現(xiàn): public void setCorePoolSize(int corePoolSize) public void setMaximumPoolSize(int maximumPoolSize)
從ThreadPoolExecutor源碼中可知,
設置新的核心線程數(shù)時, 如果設置的新值小于當前值,多余的現(xiàn)有線程將在下一次空閑時終止,如果新設置的corePoolSize值更大,將在需要時啟動新線程來執(zhí)行任何排隊的任務;
設置新的最大線程數(shù)時,如果新值小于當前值,多余的現(xiàn)有線程將在下一次空閑時終止。
ThreadPoolExecutor沒有提供直接修改等待隊列大小的api。這就需要我們自定義一個可以修改容量的隊列。其實很簡單,只要把jdk原生的隊列中的容量設置為可以修改,并提供修改方法即可。 比如把jdk中的LinkedBlockingQueue拷貝一份,命名為CapacityResizableLinkedBlockingQueue。 將其capacity的屬性變?yōu)榭勺兊?,并提供set方法:
/** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; //將上述原生代碼改為: private volatile int capacity; public void setCapacity(int capacity) { this.capacity = capacity; }
3.2 配置監(jiān)聽
可以通過配置中心的動態(tài)加載來處理,以Apollo為例,我們可以利用Apollo的ChangeListener來實現(xiàn)對配置變更的監(jiān)聽,(如果是MySQL,可以修改完配置后直接同過HTTP接口通知客戶端進行配置刷新),代碼片段如下:
public class ThreadPoolConfigUpdateListener { @Value("${apollo.bootstrap.namespaces:application}") private String namespace; @Autowired private DynamicThreadPoolFacade dynamicThreadPoolManager; @Autowired private DynamicThreadPoolProperties poolProperties; @PostConstruct public void init() { initConfigUpdateListener(); } public void initConfigUpdateListener() { String apolloNamespace = namespace; if (StringUtils.hasText(poolProperties.getApolloNamespace())) { apolloNamespace = poolProperties.getApolloNamespace(); } String finalApolloNamespace = apolloNamespace; Config config = ConfigService.getConfig(finalApolloNamespace); config.addChangeListener(changeEvent -> { try { Thread.sleep(poolProperties.getWaitRefreshConfigSeconds() * 1000); } catch (InterruptedException e) { log.error("配置刷新異常",e); } dynamicThreadPoolManager.refreshThreadPoolExecutor(); log.info("線程池配置有變化,刷新完成"); }); } }
線程池配置的刷新的邏輯如下:
public void refreshThreadPoolExecutor(DynamicThreadPoolProperties dynamicThreadPoolProperties) { try { dynamicThreadPoolProperties.getExecutors().forEach(poolProperties -> { ThreadPoolMonitor threadPoolExecutor =ThreadPoolMonitor.POOL_TASK_FUTURE_MAP.get(poolProperties.getThreadPoolName()).getThreadPoolExecutor(); executor.setCorePoolSize(poolProperties.getCorePoolSize()); executor.setMaxPoolSize(poolProperties.getMaximumPoolSize()); executor.setKeepAliveSeconds((int) poolProperties.getKeepAliveTime()); }); }catch(Exception e){ log.error("Executor 參數(shù)設置異常",e); } }
3.4 后臺管理
當然可以通過管理后臺來動態(tài)修改,如下圖,參數(shù)可保存在mysql
最后,一個簡單的線程池監(jiān)控以及動態(tài)修改架構如下圖:
4 線程池如何保持有序/串行
線程池當然是無序的,但是如果我們需要像kafka那樣分區(qū)有序怎么辦呢?
思路:建立多個只有一個線程的線程池,然后按照不同key 在 不同線程池上執(zhí)行,這樣key就相當于kafka的分區(qū),只要key相同,那么提交的任務就會有序執(zhí)行。
demo如下
類似的功能在git有大神開源了,可以去參考一下 https://github.com/PhantomThief/more-lambdas-java
以上就是JAVA線程池監(jiān)控以及動態(tài)調(diào)整示例詳解的詳細內(nèi)容,更多關于JAVA線程池監(jiān)控動態(tài)調(diào)整的資料請關注腳本之家其它相關文章!
相關文章
詳解IntelliJ IDEA 自帶的 HTTP Client 接口調(diào)用插件吊打 Postman
HTTP Client 是 IDEA 自帶的一款簡潔輕量級的接口調(diào)用插件,通過它,我們能在 IDEA 上開發(fā),調(diào)試,測試 RESTful Web 服務,接下來通過本文給大家分享IntelliJ IDEA 自帶的 HTTP Client 接口調(diào)用插件吊打 Postman的知識,感興趣的朋友一起看看吧2021-05-05springboot yml中profiles的巧妙用法(小白必看多環(huán)境配置)
這篇文章主要介紹了springboot yml中profiles的巧妙用法,非常適合多環(huán)境配置場景,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-04-04Springboot+Bootstrap實現(xiàn)增刪改查實戰(zhàn)
這篇文章主要介紹了Springboot+Bootstrap實現(xiàn)增刪改查實戰(zhàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-12-12