JAVA線程池監(jiān)控以及動(dòng)態(tài)調(diào)整示例詳解
1 背景
Java線程池源碼分析 里雖然介紹了線程池的核心配置(核心線程數(shù)、最大線程數(shù)和隊(duì)列大?。┰撊绾闻渲?,但是實(shí)際上業(yè)界也沒(méi)有一個(gè)統(tǒng)一的標(biāo)準(zhǔn)。雖然有些所謂的"公式",但是不同的業(yè)務(wù)場(chǎng)景復(fù)雜多變,配置原則也不盡相同。從實(shí)際經(jīng)驗(yàn)來(lái)看,IO密集型、CPU密集型應(yīng)用在線程配置上就比較懸殊,因此沒(méi)有一個(gè)通用的適合所有場(chǎng)景的公式。
那么我們換一種思路,就是既然不能明確配置,那么能不能支持動(dòng)態(tài)配置呢?答案是肯定的,因?yàn)榫€程池本身就支持核心線程數(shù)和最大線程數(shù)的修改,而且是實(shí)時(shí)生效的。 通常在生產(chǎn)環(huán)境中,我們可以實(shí)時(shí)監(jiān)控線程池的運(yùn)行狀態(tài),隨時(shí)掌握應(yīng)用服務(wù)的性能狀況,以便在系統(tǒng)資源緊張時(shí)及時(shí)告警,動(dòng)態(tài)調(diào)整線程配置,必要時(shí)進(jìn)行人工介入,排查問(wèn)題,線上修復(fù)。
也就是說(shuō),通過(guò)實(shí)時(shí)監(jiān)控,然后動(dòng)態(tài)修改。
2 監(jiān)控
我們知道,線程池使用不當(dāng)也會(huì)使服務(wù)器資源枯竭,導(dǎo)致異常情況的發(fā)生,比如固定線程池的阻塞隊(duì)列任務(wù)數(shù)量過(guò)多、緩存線程池創(chuàng)建的線程過(guò)多導(dǎo)致內(nèi)存溢出、系統(tǒng)假死等問(wèn)題。因此,我們需要一種簡(jiǎn)單的監(jiān)控方案來(lái)監(jiān)控線程池的使用情況,比如完成任務(wù)數(shù)量、未完成任務(wù)數(shù)量、線程大小等信息。
線程池的監(jiān)控分為2種類型,一種是在執(zhí)行任務(wù)前后全量統(tǒng)計(jì)任務(wù)排隊(duì)時(shí)間和執(zhí)行時(shí)間,另外一種是通過(guò)定時(shí)任務(wù),定時(shí)獲取活躍線程數(shù),隊(duì)列中的任務(wù)數(shù),核心線程數(shù),最大線程數(shù)等數(shù)據(jù)。
2.1 MonitoredThreadPoolStatisticsExecutor全量統(tǒng)計(jì)
參數(shù)名稱 | 說(shuō)明 |
---|---|
poolName | 線程池的名稱 |
timeout | 預(yù)設(shè)的任務(wù)超時(shí)時(shí)間閾值 |
taskTimeoutFlag | 是否記錄任務(wù)超時(shí)次數(shù) |
execTimeout | 任務(wù)執(zhí)行超時(shí)時(shí)間閾值 |
taskExecTimeoutFlag | 是否記錄任務(wù)執(zhí)行超時(shí)次數(shù) |
waitInQueueTimeout | 任務(wù)在隊(duì)列中等待的時(shí)間閾值 |
taskWaitInQueueTimeoutFlag | 是否記錄任務(wù)等待時(shí)間超時(shí)次數(shù) |
queueSizeWarningPercent | 任務(wù)隊(duì)列使用率告警閾值 |
queueSizeWarningFlag | 是否進(jìn)行隊(duì)列容量告警 |
queueSizeHasWarningFlag | 是否需要隊(duì)列容量告警(隊(duì)列是否曾經(jīng)達(dá)到過(guò)預(yù)警值) |
taskTotalTime | 任務(wù)總時(shí)長(zhǎng),以任務(wù)提交時(shí)間進(jìn)行計(jì)時(shí),單位 ms |
taskTotalExecTime | 任務(wù)總執(zhí)行時(shí)長(zhǎng),以任務(wù)開(kāi)始執(zhí)行進(jìn)行計(jì)時(shí),單位 ms |
minTaskTime | 最短任務(wù)時(shí)長(zhǎng),以提交時(shí)間計(jì)時(shí),單位 ms |
maxTaskTime | 最長(zhǎng)任務(wù)時(shí)長(zhǎng),以提交時(shí)間計(jì)時(shí),單位 ms |
taskTimeoutCount | 任務(wù)超時(shí)次數(shù),以任務(wù)提交進(jìn)行計(jì)時(shí) |
taskExecTimeoutCount | 任務(wù)執(zhí)行超時(shí)次數(shù),以任務(wù)開(kāi)始執(zhí)行時(shí)間進(jìn)行計(jì)時(shí) |
taskWaitInQueueTimeoutCount | 任務(wù)等待時(shí)間超過(guò)設(shè)定的閾值的次數(shù) |
minTaskExecTime | 最短任務(wù)時(shí)長(zhǎng),以執(zhí)行時(shí)間計(jì)時(shí),單位 ms |
maxTaskExecTime | 最長(zhǎng)任務(wù)時(shí)長(zhǎng),以執(zhí)行時(shí)間計(jì)時(shí),單位 ms |
activeCount | 線程池中正在執(zhí)行任務(wù)的線程數(shù)量 |
completedTaskCount | 線程池已完成的任務(wù)數(shù)量,該值小于等于taskCount |
corePoolSize | 線程池的核心線程數(shù)量 |
largestPoolSize | 線程池曾經(jīng)創(chuàng)建過(guò)的最大線程數(shù)量。通過(guò)這個(gè)數(shù)據(jù)可以知道線程池是否滿過(guò),也就是達(dá)到了maximumPoolSize |
maximumPoolSize | 線程池的最大線程數(shù)量 |
poolSize | 線程池當(dāng)前的線程數(shù)量 |
taskCount | 線程池已經(jīng)執(zhí)行的和未執(zhí)行的任務(wù)總數(shù) |
為了簡(jiǎn)化,代碼中的監(jiān)控?cái)?shù)據(jù)都是通過(guò)日志打印,實(shí)際中是通過(guò)kafka收集,然后做出可視化監(jiān)控。
/** * 自定義可監(jiān)控的線程池 */ public class MonitoredThreadPoolStatisticsExecutor extends ThreadPoolExecutor implements DisposableBean { /** * 線程池的名稱 */ private String poolName; /** * 預(yù)設(shè)的任務(wù)超時(shí)時(shí)間閾值,用于統(tǒng)計(jì)功能。 * 以任務(wù)提交時(shí)間進(jìn)行計(jì)時(shí),單位 ms,大于0則記錄超時(shí)次數(shù)。 */ private long timeout = 120000l; /** * 是否記錄任務(wù)超時(shí)次數(shù) */ private boolean taskTimeoutFlag = false; /** * 任務(wù)執(zhí)行超時(shí)時(shí)間閾值,用于統(tǒng)計(jì)功能。 * 以任務(wù)開(kāi)始執(zhí)行進(jìn)行計(jì)時(shí),單位 ms,大于 0 則記錄任務(wù)執(zhí)行超時(shí)次數(shù)。 */ private long execTimeout = 120000l; /** * 是否記錄任務(wù)執(zhí)行超時(shí)次數(shù) */ private boolean taskExecTimeoutFlag = false; /** * 任務(wù)在隊(duì)列中等待的時(shí)間閾值,用于統(tǒng)計(jì)功能。 * 以任務(wù)提交時(shí)間開(kāi)始計(jì)時(shí)到開(kāi)始執(zhí)行為止,單位 ms。 */ private long waitInQueueTimeout = 60000l; /** * 是否記錄任務(wù)等待時(shí)間超時(shí)次數(shù) */ private boolean taskWaitInQueueTimeoutFlag = false; /** * 任務(wù)隊(duì)列使用率告警閾值 */ private int queueSizeWarningPercent = 80; /** * 是否進(jìn)行隊(duì)列容量告警 */ private boolean queueSizeWarningFlag = false; /** * 是否需要隊(duì)列容量告警(隊(duì)列是否曾經(jīng)達(dá)到過(guò)預(yù)警值) */ private AtomicBoolean queueSizeHasWarningFlag = new AtomicBoolean(false); /** * 任務(wù)總時(shí)長(zhǎng),用于統(tǒng)計(jì)功能。以任務(wù)提交時(shí)間進(jìn)行計(jì)時(shí),單位 ms */ private AtomicLong taskTotalTime = new AtomicLong(0); /** * 任務(wù)總執(zhí)行時(shí)長(zhǎng),用于統(tǒng)計(jì)功能。以任務(wù)開(kāi)始執(zhí)行進(jìn)行計(jì)時(shí),單位 ms */ private AtomicLong taskTotalExecTime = new AtomicLong(0); /** * 最短任務(wù)時(shí)長(zhǎng),以提交時(shí)間計(jì)時(shí),單位 ms */ private long minTaskTime = Long.MAX_VALUE; /** * 最長(zhǎng)任務(wù)時(shí)長(zhǎng),以提交時(shí)間計(jì)時(shí),單位 ms */ private long maxTaskTime = 0; /** * 任務(wù)超時(shí)次數(shù),以任務(wù)提交進(jìn)行計(jì)時(shí) */ private AtomicLong taskTimeoutCount = new AtomicLong(0); /** * 任務(wù)執(zhí)行超時(shí)次數(shù),以任務(wù)開(kāi)始執(zhí)行時(shí)間進(jìn)行計(jì)時(shí) */ private AtomicLong taskExecTimeoutCount = new AtomicLong(0); /** * 任務(wù)等待時(shí)間超過(guò)設(shè)定的閾值的次數(shù) */ private AtomicLong taskWaitInQueueTimeoutCount = new AtomicLong(0); /** * 最短任務(wù)時(shí)長(zhǎng),以執(zhí)行時(shí)間計(jì)時(shí),單位 ms */ private long minTaskExecTime = Long.MAX_VALUE; /** * 最長(zhǎng)任務(wù)時(shí)長(zhǎng),以執(zhí)行時(shí)間計(jì)時(shí),單位 ms */ private long maxTaskExecTime = 0; /** * 保存任務(wù)信息 */ private Map<String, TaskStatistics> taskInfoMap = new ConcurrentHashMap<String, TaskStatistics>(); public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName), handler); this.poolName = poolName; this.timeout = timeout; this.execTimeout = execTimeout; this.waitInQueueTimeout = waitInQueueTimeout; this.queueSizeWarningPercent = queueSizeWarningPercent; if (this.timeout > 0) { this.taskTimeoutFlag = true; } if (this.execTimeout > 0) { this.taskExecTimeoutFlag = true; } if (this.waitInQueueTimeout > 0) { this.taskWaitInQueueTimeoutFlag = true; } if (this.queueSizeWarningPercent > 0) { this.queueSizeWarningFlag = true; } ThreadPoolMonitor.monitor(this); } public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName)); this.poolName = poolName; this.timeout = timeout; this.execTimeout = execTimeout; this.waitInQueueTimeout = waitInQueueTimeout; this.queueSizeWarningPercent = queueSizeWarningPercent; if (this.timeout > 0) { this.taskTimeoutFlag = true; } if (this.execTimeout > 0) { this.taskExecTimeoutFlag = true; } if (this.waitInQueueTimeout > 0) { this.taskWaitInQueueTimeoutFlag = true; } if (this.queueSizeWarningPercent > 0) { this.queueSizeWarningFlag = true; } ThreadPoolMonitor.monitor(this); } @Override public void execute(Runnable command) { this.taskInfoMap.put(String.valueOf(command.hashCode()), new TaskStatistics()); if (this.queueSizeWarningFlag) { float f = (float) getQueue().size() / (getQueue().size() + getQueue().remainingCapacity()); BigDecimal bd = new BigDecimal(f).setScale(2, BigDecimal.ROUND_HALF_UP); int usedPercent = bd.multiply(new BigDecimal(100)).intValue(); if (usedPercent > this.queueSizeWarningPercent) { this.queueSizeHasWarningFlag.set(true); System.out.println("queueSize percent Warning!used:" + usedPercent + "%,qSize:" + getQueue().size() + ",remaining:" + getQueue().remainingCapacity()); } } super.execute(command); } @Override protected void beforeExecute(Thread t, Runnable r) { TaskStatistics taskStatistics = this.taskInfoMap.get(String.valueOf(r.hashCode())); if (null != taskStatistics) { taskStatistics.setStartExecTime(System.currentTimeMillis()); } super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { //重寫(xiě)此方法做一些統(tǒng)計(jì)功能 long endTime = System.currentTimeMillis(); TaskStatistics taskStatistics = this.taskInfoMap.remove(String.valueOf(r.hashCode())); if (null != taskStatistics) { long taskTotalTime = endTime - taskStatistics.getCommitTime(); long taskExecTime = endTime - taskStatistics.getStartExecTime(); long taskWaitInQueueTime = taskStatistics.getStartExecTime() - taskStatistics.getCommitTime(); this.taskTotalTime.addAndGet(taskTotalTime); this.taskTotalExecTime.addAndGet(taskExecTime); if (this.minTaskTime > taskTotalTime) { this.minTaskTime = taskTotalTime; } if (this.maxTaskTime < taskTotalTime) { this.maxTaskTime = taskTotalTime; } if (this.taskTimeoutFlag && taskTotalTime > this.timeout) { this.taskTimeoutCount.incrementAndGet(); } if (this.minTaskExecTime > taskExecTime) { this.minTaskExecTime = taskExecTime; } if (this.maxTaskExecTime < taskExecTime) { this.maxTaskExecTime = taskExecTime; } if (this.taskExecTimeoutFlag && taskExecTime > this.execTimeout) { this.taskExecTimeoutCount.incrementAndGet(); } if (this.taskWaitInQueueTimeoutFlag && taskWaitInQueueTime > this.waitInQueueTimeout) { this.taskWaitInQueueTimeoutCount.incrementAndGet(); } System.out.println("task cost info[ taskTotalTime:" + taskTotalTime + ",taskExecTime:" + taskExecTime + ",taskWaitInQueueTime:" + taskWaitInQueueTime + " ]"); // 初始線程數(shù)、核心線程數(shù)、正在執(zhí)行的任務(wù)數(shù)量、 // 已完成任務(wù)數(shù)量、任務(wù)總數(shù)、隊(duì)列里緩存的任務(wù)數(shù)量、池中存在的最大線程數(shù)、 // 最大允許的線程數(shù)、線程空閑時(shí)間、線程池是否關(guān)閉、線程池是否終止 LOGGER.info("{}-pool-monitor: " + " PoolSize: {}, CorePoolSize: {}, Active: {}, " + "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " + "MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}", this.poolName, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(), this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated()); } super.afterExecute(r, t); } /** * Spring容器管理線程池的生命周期,線程池Bean銷毀之前先關(guān)閉 * @throws Exception */ @Override public void destroy() throws Exception { shutdown(); } /** * 線程池延遲關(guān)閉時(shí)(等待線程池里的任務(wù)都執(zhí)行完畢),統(tǒng)計(jì)線程池情況 */ @Override public void shutdown() { // 統(tǒng)計(jì)已執(zhí)行任務(wù)、正在執(zhí)行任務(wù)、未執(zhí)行任務(wù)數(shù)量 LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); super.shutdown(); } /** * 線程池立即關(guān)閉時(shí),統(tǒng)計(jì)線程池情況 */ @Override public List<Runnable> shutdownNow() { // 統(tǒng)計(jì)已執(zhí)行任務(wù)、正在執(zhí)行任務(wù)、未執(zhí)行任務(wù)數(shù)量 LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); return super.shutdownNow(); } /** * 任務(wù)平均時(shí)長(zhǎng),無(wú)已完成任務(wù)時(shí),返回 0 */ public long getTaskAvgTime() { if (this.getCompletedTaskCount() > 0) { return this.getTaskTotalTime().get() / this.getCompletedTaskCount(); } return 0; } /** * 任務(wù)平均執(zhí)行時(shí)長(zhǎng),無(wú)已完成任務(wù)時(shí),返回 0 */ public long getTaskAvgExecTime() { if (this.getCompletedTaskCount() > 0) { return this.getTaskTotalExecTime().get() / this.getCompletedTaskCount(); } return 0; } //省略setter/getter方法 }
public class TaskStatistics { /** * 任務(wù)提交時(shí)間 */ private long commitTime; /** * 任務(wù)開(kāi)始執(zhí)行時(shí)間 */ private long startExecTime; public TaskStatistics() { this.commitTime = System.currentTimeMillis(); } }
方法 | 含義 |
---|---|
shutdown() | 線程池延遲關(guān)閉時(shí)(等待線程池里的任務(wù)都執(zhí)行完畢),統(tǒng)計(jì)已執(zhí)行任務(wù)、正在執(zhí)行任務(wù)、未執(zhí)行任務(wù)數(shù)量 |
shutdownNow() | 線程池立即關(guān)閉時(shí),統(tǒng)計(jì)已執(zhí)行任務(wù)、正在執(zhí)行任務(wù)、未執(zhí)行任務(wù)數(shù)量 |
beforeExecute(Thread t, Runnable r) | 任務(wù)執(zhí)行之前,記錄任務(wù)開(kāi)始時(shí)間,startTimes這個(gè)HashMap以任務(wù)的hashCode為key,開(kāi)始時(shí)間為值 |
afterExecute(Runnable r, Throwable t) | 任務(wù)執(zhí)行之后,計(jì)算任務(wù)結(jié)束時(shí)間。統(tǒng)計(jì)任務(wù)耗時(shí)、初始線程數(shù)、核心線程數(shù)、正在執(zhí)行的任務(wù)數(shù)量、已完成任務(wù)數(shù)量、任務(wù)總數(shù)、隊(duì)列里緩存的任務(wù)數(shù)量、池中存在的最大線程數(shù)、最大允許的線程數(shù)、線程空閑時(shí)間、線程池是否關(guān)閉、線程池是否終止信息 |
注意事項(xiàng):
- 在 afterExecute 方法中需要注意,需要調(diào)用 ConcurrentHashMap 的 remove 方法移除并返回任務(wù)的開(kāi)始時(shí)間信息,而不是調(diào)用 get 方法,因?yàn)樵诟卟l(fā)情況下,線程池里要執(zhí)行的任務(wù)很多,如果只獲取值不移除的話,會(huì)使 ConcurrentHashMap 越來(lái)越大,引發(fā)內(nèi)存泄漏或溢出問(wèn)題。
2.2 定時(shí)采集
public class ThreadPoolMonitor { private static final Map<String, FutureWrapper> POOL_TASK_FUTURE_MAP = new ConcurrentHashMap<>(); private static final ScheduledThreadPoolExecutor SCHEDULE_THREAD_POOL = new ScheduledThreadPoolExecutor(8, new NamedThreadFactory("ThreadPoolMonitor")); private static final Long DEFAULT_MONITOR_PERIOD_TIME_MILLS = 1000L; public ThreadPoolMonitor() { } public static void monitor(String name, ThreadPoolExecutor threadPoolExecutor) { if (threadPoolExecutor instanceof MonitoredThreadPoolStatisticsExecutor) { throw new IllegalArgumentException("MonitoredThreadPoolStatisticsExecutor is already monitored."); } else { monitor0(name, threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS); } } public static void remove(String name) { ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.remove(name); if (futureWrapper != null) { futureWrapper.future.cancel(false); } } public static void remove(String name, ThreadPoolExecutor threadPoolExecutor) { ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.get(name); if (futureWrapper != null && futureWrapper.threadPoolExecutor == threadPoolExecutor) { POOL_TASK_FUTURE_MAP.remove(name, futureWrapper); futureWrapper.future.cancel(false); } } static void monitor(MonitoredThreadPoolStatisticsExecutor threadPoolExecutor) { monitor0(threadPoolExecutor.poolName(), threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS); } private static void monitor0(String name, ThreadPoolExecutor threadPoolExecutor, long monitorPeriodTimeMills) { PoolMonitorTask poolMonitorTask = new PoolMonitorTask(threadPoolExecutor, name); POOL_TASK_FUTURE_MAP.compute(name, (k, v) -> { if (v == null) { return new ThreadPoolMonitor.FutureWrapper(SCHEDULE_THREAD_POOL.scheduleWithFixedDelay(poolMonitorTask, 0L, monitorPeriodTimeMills, TimeUnit.MILLISECONDS), threadPoolExecutor); } else { throw new IllegalStateException("duplicate pool name: " + name); } }); } static { Runtime.getRuntime().addShutdownHook(new Thread(ThreadPoolMonitor.SCHEDULE_THREAD_POOL::shutdown)); } static class FutureWrapper { private final Future<?> future; private final ThreadPoolExecutor threadPoolExecutor; public FutureWrapper(Future<?> future, ThreadPoolExecutor threadPoolExecutor) { this.future = future; this.threadPoolExecutor = threadPoolExecutor; } } }
public class PoolMonitorTask implements Runnable { private final ThreadPoolExecutor monitoredThreadPool; private final String poolName; private volatile long lastTaskCount = 0L; public PoolMonitorTask(ThreadPoolExecutor monitoredThreadPool, String poolName) { this.monitoredThreadPool = monitoredThreadPool; this.poolName = poolName; } @Override public void run() { int activeCount = this.monitoredThreadPool.getActiveCount(); int corePoolSize = this.monitoredThreadPool.getCorePoolSize(); int maximumPoolSize = this.monitoredThreadPool.getMaximumPoolSize(); int queueTaskSize = this.monitoredThreadPool.getQueue().size(); long taskCount = this.monitoredThreadPool.getTaskCount(); int executedTask = (int) (taskCount - this.lastTaskCount); log.info("線程池名稱 = {}, 活躍線程數(shù)峰值 = {}, 隊(duì)列任務(wù)數(shù)峰值 = {}, 核心線程數(shù) = {}, 最大線程數(shù) = {}, 執(zhí)行的任務(wù)總數(shù) = {}", this.poolName, activeCount, queueTaskSize, corePoolSize, maximumPoolSize, executedTask); this.lastTaskCount = taskCount; if (this.monitoredThreadPool.isTerminated()) { ThreadPoolMonitor.remove(this.poolName, this.monitoredThreadPool); } } }
2.3 可視化
通過(guò)Kafka獲取到監(jiān)控?cái)?shù)據(jù)后,可以做一個(gè)可視化頁(yè)面,比如可以展示下面這這些數(shù)據(jù)
active/coreSize :活動(dòng)線程數(shù)和核心線程數(shù)的比值, 其中active = executor.getActiveCount(),表示所有運(yùn)行中的工作線程的數(shù)量,這個(gè)比值反應(yīng)線程池的線程活躍狀態(tài),如果一直維持在一個(gè)很低的水平,則說(shuō)明線程池需要進(jìn)行縮容;如果長(zhǎng)時(shí)間維持一個(gè)很大的數(shù)值,說(shuō)明活躍度好,線程池利用率高。
active/maxSize :活動(dòng)線程數(shù)和最大線程數(shù)的比值,這個(gè)值可以配合上面的 active/coreSize 來(lái)看,當(dāng)active/coreSize大于100%的時(shí)候,如果active/maxSize維持在一個(gè)較低的值,則說(shuō)明當(dāng)前線程池的負(fù)載偏低,如果大于60%或者更高,則說(shuō)明線程池過(guò)載,需要及時(shí)調(diào)整線程池容量配置。
completedTaskCount:執(zhí)行完畢的工作線程的總數(shù),包含歷史所有。
largestPoolSize:歷史上線程池容量觸達(dá)過(guò)的最大值
rejectCount:被拒絕的線程的數(shù)量,如果大量線程被拒絕,則說(shuō)明當(dāng)前線程池已經(jīng)溢出了,需要及時(shí)調(diào)整線程池配置
queueSize:隊(duì)列中工作線程的數(shù)量,如果大量的線程池在排隊(duì),說(shuō)明coreSize已經(jīng)不夠用了,可以根據(jù)實(shí)際情況來(lái)調(diào)整,對(duì)于執(zhí)行時(shí)間要求很?chē)?yán)格的業(yè)務(wù)場(chǎng)景,可能需要通過(guò)提升coreSize來(lái)減少排隊(duì)情況。
3 動(dòng)態(tài)調(diào)整線程池
配置線程池的大小可根據(jù)以下幾個(gè)維度進(jìn)行分析來(lái)配置合理的線程數(shù):
任務(wù)性質(zhì)可分為:CPU密集型任務(wù),IO密集型任務(wù),混合型任務(wù),任務(wù)的執(zhí)行時(shí)長(zhǎng),任務(wù)是否有依賴——依賴其他系統(tǒng)資源,如數(shù)據(jù)庫(kù)連接等。
1、CPU密集型任務(wù) 盡量使用較小的線程池,一般為CPU核數(shù)+1。 因?yàn)镃PU密集型任務(wù)使得CPU使用率很高,若開(kāi)過(guò)多的線程數(shù),只能增加上下文切換的次數(shù),因此會(huì)帶來(lái)額外的開(kāi)銷。
2、IO密集型任務(wù) 可以使用稍大的線程池,一般為2*CPU核數(shù)+1。 因?yàn)镮O操作不占用CPU,不要讓CPU閑下來(lái),應(yīng)加大線程數(shù)量,因此可以讓CPU在等待IO的時(shí)候去處理別的任務(wù),充分利用CPU時(shí)間。
3、混合型任務(wù) 可以將任務(wù)分成IO密集型和CPU密集型任務(wù),然后分別用不同的線程池去處理。 只要分完之后兩個(gè)任務(wù)的執(zhí)行時(shí)間相差不大,那么就會(huì)比串行執(zhí)行來(lái)的高效。 因?yàn)槿绻麆澐种髢蓚€(gè)任務(wù)執(zhí)行時(shí)間相差甚遠(yuǎn),那么先執(zhí)行完的任務(wù)就要等后執(zhí)行完的任務(wù),最終的時(shí)間仍然取決于后執(zhí)行完的任務(wù),而且還要加上任務(wù)拆分與合并的開(kāi)銷,得不償失
4、依賴其他資源 如某個(gè)任務(wù)依賴數(shù)據(jù)庫(kù)的連接返回的結(jié)果,這時(shí)候等待的時(shí)間越長(zhǎng),則CPU空閑的時(shí)間越長(zhǎng),那么線程數(shù)量應(yīng)設(shè)置得越大,才能更好的利用CPU。
小結(jié):線程等待時(shí)間所占比例越高,需要越多線程。線程CPU時(shí)間所占比例越高,需要越少線程。
但是實(shí)踐發(fā)現(xiàn),盡管我們經(jīng)過(guò)謹(jǐn)慎的評(píng)估,仍然不能夠保證一次計(jì)算出來(lái)合適的參數(shù),那么我們是否可以將修改線程池參數(shù)的成本降下來(lái),這樣至少可以發(fā)生故障的時(shí)候可以快速調(diào)整從而縮短故障恢復(fù)的時(shí)間呢? 基于這個(gè)思考,我們是否可以將線程池的參數(shù)從代碼中遷移到分布式配置中心上,實(shí)現(xiàn)線程池參數(shù)可動(dòng)態(tài)配置和即時(shí)生效,線程池參數(shù)動(dòng)態(tài)化前后的參數(shù)修改流程對(duì)比如下:
3.1 修改參數(shù)
實(shí)際應(yīng)用中主要有下列參數(shù)可以支持動(dòng)態(tài)修改。
線程池參數(shù) | 說(shuō)明 |
---|---|
corePoolSize | 核心線程數(shù) |
maximumPoolSize | 最大線程數(shù) |
queueCapacity | 等待隊(duì)列大小 |
timeout | 任務(wù)超時(shí)時(shí)間告警閾值 |
execTimeout | 任務(wù)執(zhí)行超時(shí)時(shí)間告警閾值 |
queuedTaskWarningSize | 等待隊(duì)列排隊(duì)數(shù)量告警閾值 |
checkInterval | 線程池定時(shí)監(jiān)控時(shí)間間隔 |
autoExtend | 是否自動(dòng)擴(kuò)容 |
其中的corePoolSize、maximumPoolSize都可以使用ThreadPoolExecutor提供的api實(shí)現(xiàn): public void setCorePoolSize(int corePoolSize) public void setMaximumPoolSize(int maximumPoolSize)
從ThreadPoolExecutor源碼中可知,
設(shè)置新的核心線程數(shù)時(shí), 如果設(shè)置的新值小于當(dāng)前值,多余的現(xiàn)有線程將在下一次空閑時(shí)終止,如果新設(shè)置的corePoolSize值更大,將在需要時(shí)啟動(dòng)新線程來(lái)執(zhí)行任何排隊(duì)的任務(wù);
設(shè)置新的最大線程數(shù)時(shí),如果新值小于當(dāng)前值,多余的現(xiàn)有線程將在下一次空閑時(shí)終止。
ThreadPoolExecutor沒(méi)有提供直接修改等待隊(duì)列大小的api。這就需要我們自定義一個(gè)可以修改容量的隊(duì)列。其實(shí)很簡(jiǎn)單,只要把jdk原生的隊(duì)列中的容量設(shè)置為可以修改,并提供修改方法即可。 比如把jdk中的LinkedBlockingQueue拷貝一份,命名為CapacityResizableLinkedBlockingQueue。 將其capacity的屬性變?yōu)榭勺兊?,并提供set方法:
/** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; //將上述原生代碼改為: private volatile int capacity; public void setCapacity(int capacity) { this.capacity = capacity; }
3.2 配置監(jiān)聽(tīng)
可以通過(guò)配置中心的動(dòng)態(tài)加載來(lái)處理,以Apollo為例,我們可以利用Apollo的ChangeListener來(lái)實(shí)現(xiàn)對(duì)配置變更的監(jiān)聽(tīng),(如果是MySQL,可以修改完配置后直接同過(guò)HTTP接口通知客戶端進(jìn)行配置刷新),代碼片段如下:
public class ThreadPoolConfigUpdateListener { @Value("${apollo.bootstrap.namespaces:application}") private String namespace; @Autowired private DynamicThreadPoolFacade dynamicThreadPoolManager; @Autowired private DynamicThreadPoolProperties poolProperties; @PostConstruct public void init() { initConfigUpdateListener(); } public void initConfigUpdateListener() { String apolloNamespace = namespace; if (StringUtils.hasText(poolProperties.getApolloNamespace())) { apolloNamespace = poolProperties.getApolloNamespace(); } String finalApolloNamespace = apolloNamespace; Config config = ConfigService.getConfig(finalApolloNamespace); config.addChangeListener(changeEvent -> { try { Thread.sleep(poolProperties.getWaitRefreshConfigSeconds() * 1000); } catch (InterruptedException e) { log.error("配置刷新異常",e); } dynamicThreadPoolManager.refreshThreadPoolExecutor(); log.info("線程池配置有變化,刷新完成"); }); } }
線程池配置的刷新的邏輯如下:
public void refreshThreadPoolExecutor(DynamicThreadPoolProperties dynamicThreadPoolProperties) { try { dynamicThreadPoolProperties.getExecutors().forEach(poolProperties -> { ThreadPoolMonitor threadPoolExecutor =ThreadPoolMonitor.POOL_TASK_FUTURE_MAP.get(poolProperties.getThreadPoolName()).getThreadPoolExecutor(); executor.setCorePoolSize(poolProperties.getCorePoolSize()); executor.setMaxPoolSize(poolProperties.getMaximumPoolSize()); executor.setKeepAliveSeconds((int) poolProperties.getKeepAliveTime()); }); }catch(Exception e){ log.error("Executor 參數(shù)設(shè)置異常",e); } }
3.4 后臺(tái)管理
當(dāng)然可以通過(guò)管理后臺(tái)來(lái)動(dòng)態(tài)修改,如下圖,參數(shù)可保存在mysql
最后,一個(gè)簡(jiǎn)單的線程池監(jiān)控以及動(dòng)態(tài)修改架構(gòu)如下圖:
4 線程池如何保持有序/串行
線程池當(dāng)然是無(wú)序的,但是如果我們需要像kafka那樣分區(qū)有序怎么辦呢?
思路:建立多個(gè)只有一個(gè)線程的線程池,然后按照不同key 在 不同線程池上執(zhí)行,這樣key就相當(dāng)于kafka的分區(qū),只要key相同,那么提交的任務(wù)就會(huì)有序執(zhí)行。
demo如下
類似的功能在git有大神開(kāi)源了,可以去參考一下 https://github.com/PhantomThief/more-lambdas-java
以上就是JAVA線程池監(jiān)控以及動(dòng)態(tài)調(diào)整示例詳解的詳細(xì)內(nèi)容,更多關(guān)于JAVA線程池監(jiān)控動(dòng)態(tài)調(diào)整的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解IntelliJ IDEA 自帶的 HTTP Client 接口調(diào)用插件吊打 Postman
HTTP Client 是 IDEA 自帶的一款簡(jiǎn)潔輕量級(jí)的接口調(diào)用插件,通過(guò)它,我們能在 IDEA 上開(kāi)發(fā),調(diào)試,測(cè)試 RESTful Web 服務(wù),接下來(lái)通過(guò)本文給大家分享IntelliJ IDEA 自帶的 HTTP Client 接口調(diào)用插件吊打 Postman的知識(shí),感興趣的朋友一起看看吧2021-05-05詳解java中的6種單例寫(xiě)法及優(yōu)缺點(diǎn)
在java中,單例有很多種寫(xiě)法,面試時(shí),手寫(xiě)代碼環(huán)節(jié),除了寫(xiě)算法題,有時(shí)候也會(huì)讓手寫(xiě)單例模式,這里記錄一下單例的幾種寫(xiě)法和優(yōu)缺點(diǎn)。需要的朋友可以參考下2018-11-11springboot yml中profiles的巧妙用法(小白必看多環(huán)境配置)
這篇文章主要介紹了springboot yml中profiles的巧妙用法,非常適合多環(huán)境配置場(chǎng)景,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04JPA添加Pageable實(shí)現(xiàn)翻頁(yè)時(shí)報(bào)錯(cuò)的問(wèn)題
這篇文章主要介紹了解決JPA添加Pageable實(shí)現(xiàn)翻頁(yè)時(shí)報(bào)錯(cuò)的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09Springboot+Bootstrap實(shí)現(xiàn)增刪改查實(shí)戰(zhàn)
這篇文章主要介紹了Springboot+Bootstrap實(shí)現(xiàn)增刪改查實(shí)戰(zhàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12Java基于二叉查找樹(shù)實(shí)現(xiàn)排序功能示例
這篇文章主要介紹了Java基于二叉查找樹(shù)實(shí)現(xiàn)排序功能,結(jié)合實(shí)例形式分析了Java二叉查找樹(shù)的定義、遍歷及排序等相關(guān)操作技巧,需要的朋友可以參考下2017-08-08