欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

JAVA線程池監(jiān)控以及動(dòng)態(tài)調(diào)整示例詳解

 更新時(shí)間:2023年09月21日 09:28:26   作者:雪飄千里  
這篇文章主要為大家介紹了JAVA線程池監(jiān)控以及動(dòng)態(tài)調(diào)整示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

1 背景

Java線程池源碼分析 里雖然介紹了線程池的核心配置(核心線程數(shù)、最大線程數(shù)和隊(duì)列大?。┰撊绾闻渲?,但是實(shí)際上業(yè)界也沒(méi)有一個(gè)統(tǒng)一的標(biāo)準(zhǔn)。雖然有些所謂的"公式",但是不同的業(yè)務(wù)場(chǎng)景復(fù)雜多變,配置原則也不盡相同。從實(shí)際經(jīng)驗(yàn)來(lái)看,IO密集型、CPU密集型應(yīng)用在線程配置上就比較懸殊,因此沒(méi)有一個(gè)通用的適合所有場(chǎng)景的公式。

那么我們換一種思路,就是既然不能明確配置,那么能不能支持動(dòng)態(tài)配置呢?答案是肯定的,因?yàn)榫€程池本身就支持核心線程數(shù)和最大線程數(shù)的修改,而且是實(shí)時(shí)生效的。 通常在生產(chǎn)環(huán)境中,我們可以實(shí)時(shí)監(jiān)控線程池的運(yùn)行狀態(tài),隨時(shí)掌握應(yīng)用服務(wù)的性能狀況,以便在系統(tǒng)資源緊張時(shí)及時(shí)告警,動(dòng)態(tài)調(diào)整線程配置,必要時(shí)進(jìn)行人工介入,排查問(wèn)題,線上修復(fù)。

也就是說(shuō),通過(guò)實(shí)時(shí)監(jiān)控,然后動(dòng)態(tài)修改。

2 監(jiān)控

我們知道,線程池使用不當(dāng)也會(huì)使服務(wù)器資源枯竭,導(dǎo)致異常情況的發(fā)生,比如固定線程池的阻塞隊(duì)列任務(wù)數(shù)量過(guò)多、緩存線程池創(chuàng)建的線程過(guò)多導(dǎo)致內(nèi)存溢出、系統(tǒng)假死等問(wèn)題。因此,我們需要一種簡(jiǎn)單的監(jiān)控方案來(lái)監(jiān)控線程池的使用情況,比如完成任務(wù)數(shù)量、未完成任務(wù)數(shù)量、線程大小等信息。

線程池的監(jiān)控分為2種類型,一種是在執(zhí)行任務(wù)前后全量統(tǒng)計(jì)任務(wù)排隊(duì)時(shí)間和執(zhí)行時(shí)間,另外一種是通過(guò)定時(shí)任務(wù),定時(shí)獲取活躍線程數(shù),隊(duì)列中的任務(wù)數(shù),核心線程數(shù),最大線程數(shù)等數(shù)據(jù)。

2.1 MonitoredThreadPoolStatisticsExecutor全量統(tǒng)計(jì)

參數(shù)名稱說(shuō)明
poolName線程池的名稱
timeout預(yù)設(shè)的任務(wù)超時(shí)時(shí)間閾值
taskTimeoutFlag是否記錄任務(wù)超時(shí)次數(shù)
execTimeout任務(wù)執(zhí)行超時(shí)時(shí)間閾值
taskExecTimeoutFlag是否記錄任務(wù)執(zhí)行超時(shí)次數(shù)
waitInQueueTimeout任務(wù)在隊(duì)列中等待的時(shí)間閾值
taskWaitInQueueTimeoutFlag是否記錄任務(wù)等待時(shí)間超時(shí)次數(shù)
queueSizeWarningPercent任務(wù)隊(duì)列使用率告警閾值
queueSizeWarningFlag是否進(jìn)行隊(duì)列容量告警
queueSizeHasWarningFlag是否需要隊(duì)列容量告警(隊(duì)列是否曾經(jīng)達(dá)到過(guò)預(yù)警值)
taskTotalTime任務(wù)總時(shí)長(zhǎng),以任務(wù)提交時(shí)間進(jìn)行計(jì)時(shí),單位 ms
taskTotalExecTime任務(wù)總執(zhí)行時(shí)長(zhǎng),以任務(wù)開(kāi)始執(zhí)行進(jìn)行計(jì)時(shí),單位 ms
minTaskTime最短任務(wù)時(shí)長(zhǎng),以提交時(shí)間計(jì)時(shí),單位 ms
maxTaskTime最長(zhǎng)任務(wù)時(shí)長(zhǎng),以提交時(shí)間計(jì)時(shí),單位 ms
taskTimeoutCount任務(wù)超時(shí)次數(shù),以任務(wù)提交進(jìn)行計(jì)時(shí)
taskExecTimeoutCount任務(wù)執(zhí)行超時(shí)次數(shù),以任務(wù)開(kāi)始執(zhí)行時(shí)間進(jìn)行計(jì)時(shí)
taskWaitInQueueTimeoutCount任務(wù)等待時(shí)間超過(guò)設(shè)定的閾值的次數(shù)
minTaskExecTime最短任務(wù)時(shí)長(zhǎng),以執(zhí)行時(shí)間計(jì)時(shí),單位 ms
maxTaskExecTime最長(zhǎng)任務(wù)時(shí)長(zhǎng),以執(zhí)行時(shí)間計(jì)時(shí),單位 ms
activeCount線程池中正在執(zhí)行任務(wù)的線程數(shù)量
completedTaskCount線程池已完成的任務(wù)數(shù)量,該值小于等于taskCount
corePoolSize線程池的核心線程數(shù)量
largestPoolSize線程池曾經(jīng)創(chuàng)建過(guò)的最大線程數(shù)量。通過(guò)這個(gè)數(shù)據(jù)可以知道線程池是否滿過(guò),也就是達(dá)到了maximumPoolSize
maximumPoolSize線程池的最大線程數(shù)量
poolSize線程池當(dāng)前的線程數(shù)量
taskCount線程池已經(jīng)執(zhí)行的和未執(zhí)行的任務(wù)總數(shù)

為了簡(jiǎn)化,代碼中的監(jiān)控?cái)?shù)據(jù)都是通過(guò)日志打印,實(shí)際中是通過(guò)kafka收集,然后做出可視化監(jiān)控。

/**
 * 自定義可監(jiān)控的線程池
 */
 public class MonitoredThreadPoolStatisticsExecutor extends ThreadPoolExecutor implements DisposableBean {
 /**
 * 線程池的名稱
 */
 private String poolName;
 /**
 * 預(yù)設(shè)的任務(wù)超時(shí)時(shí)間閾值,用于統(tǒng)計(jì)功能。     * 以任務(wù)提交時(shí)間進(jìn)行計(jì)時(shí),單位 ms,大于0則記錄超時(shí)次數(shù)。
 */
 private long timeout = 120000l;
 /**
 * 是否記錄任務(wù)超時(shí)次數(shù)
 */
 private boolean taskTimeoutFlag = false;
 /**
 * 任務(wù)執(zhí)行超時(shí)時(shí)間閾值,用于統(tǒng)計(jì)功能。     * 以任務(wù)開(kāi)始執(zhí)行進(jìn)行計(jì)時(shí),單位 ms,大于 0 則記錄任務(wù)執(zhí)行超時(shí)次數(shù)。
 */
 private long execTimeout = 120000l;
 /**
 * 是否記錄任務(wù)執(zhí)行超時(shí)次數(shù)
 */
 private boolean taskExecTimeoutFlag = false;
 /**
 * 任務(wù)在隊(duì)列中等待的時(shí)間閾值,用于統(tǒng)計(jì)功能。     * 以任務(wù)提交時(shí)間開(kāi)始計(jì)時(shí)到開(kāi)始執(zhí)行為止,單位 ms。
 */
 private long waitInQueueTimeout = 60000l;
 /**
 * 是否記錄任務(wù)等待時(shí)間超時(shí)次數(shù)
 */
 private boolean taskWaitInQueueTimeoutFlag = false;
 /**
 * 任務(wù)隊(duì)列使用率告警閾值
 */
 private int queueSizeWarningPercent = 80;
 /**
 * 是否進(jìn)行隊(duì)列容量告警
 */
 private boolean queueSizeWarningFlag = false;
 /**
 * 是否需要隊(duì)列容量告警(隊(duì)列是否曾經(jīng)達(dá)到過(guò)預(yù)警值)
 */
 private AtomicBoolean queueSizeHasWarningFlag = new AtomicBoolean(false);
 /**
 * 任務(wù)總時(shí)長(zhǎng),用于統(tǒng)計(jì)功能。以任務(wù)提交時(shí)間進(jìn)行計(jì)時(shí),單位 ms
 */
 private AtomicLong taskTotalTime = new AtomicLong(0);
 /**
 * 任務(wù)總執(zhí)行時(shí)長(zhǎng),用于統(tǒng)計(jì)功能。以任務(wù)開(kāi)始執(zhí)行進(jìn)行計(jì)時(shí),單位 ms
 */
 private AtomicLong taskTotalExecTime = new AtomicLong(0);
 /**
 * 最短任務(wù)時(shí)長(zhǎng),以提交時(shí)間計(jì)時(shí),單位 ms
 */
 private long minTaskTime = Long.MAX_VALUE;
 /**
 * 最長(zhǎng)任務(wù)時(shí)長(zhǎng),以提交時(shí)間計(jì)時(shí),單位 ms
 */
 private long maxTaskTime = 0;
 /**
 * 任務(wù)超時(shí)次數(shù),以任務(wù)提交進(jìn)行計(jì)時(shí)
 */
 private AtomicLong taskTimeoutCount = new AtomicLong(0);
 /**
 * 任務(wù)執(zhí)行超時(shí)次數(shù),以任務(wù)開(kāi)始執(zhí)行時(shí)間進(jìn)行計(jì)時(shí)
 */
 private AtomicLong taskExecTimeoutCount = new AtomicLong(0);
 /**
 * 任務(wù)等待時(shí)間超過(guò)設(shè)定的閾值的次數(shù)
 */
 private AtomicLong taskWaitInQueueTimeoutCount = new AtomicLong(0);
 /**
 * 最短任務(wù)時(shí)長(zhǎng),以執(zhí)行時(shí)間計(jì)時(shí),單位 ms
 */
 private long minTaskExecTime = Long.MAX_VALUE;
 /**
 * 最長(zhǎng)任務(wù)時(shí)長(zhǎng),以執(zhí)行時(shí)間計(jì)時(shí),單位 ms
 */
 private long maxTaskExecTime = 0;
 /**
 * 保存任務(wù)信息
 */
 private Map<String, TaskStatistics> taskInfoMap = new ConcurrentHashMap<String, TaskStatistics>();
 public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) {
 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName), handler);
 this.poolName = poolName;
 this.timeout = timeout;
 this.execTimeout = execTimeout;
 this.waitInQueueTimeout = waitInQueueTimeout;
 this.queueSizeWarningPercent = queueSizeWarningPercent;
 if (this.timeout > 0) {
 this.taskTimeoutFlag = true;
 }
 if (this.execTimeout > 0) {
 this.taskExecTimeoutFlag = true;
 }
 if (this.waitInQueueTimeout > 0) {
 this.taskWaitInQueueTimeoutFlag = true;
 }
 if (this.queueSizeWarningPercent > 0) {
 this.queueSizeWarningFlag = true;
 }
 ThreadPoolMonitor.monitor(this);
 }
 public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) {
 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName));
 this.poolName = poolName;
 this.timeout = timeout;
 this.execTimeout = execTimeout;
 this.waitInQueueTimeout = waitInQueueTimeout;
 this.queueSizeWarningPercent = queueSizeWarningPercent;
 if (this.timeout > 0) {
 this.taskTimeoutFlag = true;
 }
 if (this.execTimeout > 0) {
 this.taskExecTimeoutFlag = true;
 }
 if (this.waitInQueueTimeout > 0) {
 this.taskWaitInQueueTimeoutFlag = true;
 }
 if (this.queueSizeWarningPercent > 0) {
 this.queueSizeWarningFlag = true;
 }
 ThreadPoolMonitor.monitor(this);
 }
 @Override
 public void execute(Runnable command) {
 this.taskInfoMap.put(String.valueOf(command.hashCode()), new TaskStatistics());
 if (this.queueSizeWarningFlag) {
 float f = (float) getQueue().size() / (getQueue().size() + getQueue().remainingCapacity());
 BigDecimal bd = new BigDecimal(f).setScale(2, BigDecimal.ROUND_HALF_UP);
 int usedPercent = bd.multiply(new BigDecimal(100)).intValue();
 if (usedPercent > this.queueSizeWarningPercent) {
 this.queueSizeHasWarningFlag.set(true);
 System.out.println("queueSize percent Warning!used:" + usedPercent + "%,qSize:" + getQueue().size() + ",remaining:" + getQueue().remainingCapacity());
 }
 }
 super.execute(command);
 }
 @Override
 protected void beforeExecute(Thread t, Runnable r) {
 TaskStatistics taskStatistics = this.taskInfoMap.get(String.valueOf(r.hashCode()));
 if (null != taskStatistics) {
 taskStatistics.setStartExecTime(System.currentTimeMillis());
 }
 super.beforeExecute(t, r);
 }
 @Override
 protected void afterExecute(Runnable r, Throwable t) {
 //重寫(xiě)此方法做一些統(tǒng)計(jì)功能
 long endTime = System.currentTimeMillis();
 TaskStatistics taskStatistics = this.taskInfoMap.remove(String.valueOf(r.hashCode()));
 if (null != taskStatistics) {
 long taskTotalTime = endTime - taskStatistics.getCommitTime();
 long taskExecTime = endTime - taskStatistics.getStartExecTime();
 long taskWaitInQueueTime = taskStatistics.getStartExecTime() - taskStatistics.getCommitTime();
 this.taskTotalTime.addAndGet(taskTotalTime);
 this.taskTotalExecTime.addAndGet(taskExecTime);
 if (this.minTaskTime > taskTotalTime) {
 this.minTaskTime = taskTotalTime;
 }
 if (this.maxTaskTime < taskTotalTime) {
 this.maxTaskTime = taskTotalTime;
 }
 if (this.taskTimeoutFlag && taskTotalTime > this.timeout) {
 this.taskTimeoutCount.incrementAndGet();
 }
 if (this.minTaskExecTime > taskExecTime) {
 this.minTaskExecTime = taskExecTime;
 }
 if (this.maxTaskExecTime < taskExecTime) {
 this.maxTaskExecTime = taskExecTime;
 }
 if (this.taskExecTimeoutFlag && taskExecTime > this.execTimeout) {
 this.taskExecTimeoutCount.incrementAndGet();
 }
 if (this.taskWaitInQueueTimeoutFlag && taskWaitInQueueTime > this.waitInQueueTimeout) {
 this.taskWaitInQueueTimeoutCount.incrementAndGet();
 }
 System.out.println("task cost info[ taskTotalTime:" + taskTotalTime + ",taskExecTime:" + taskExecTime + ",taskWaitInQueueTime:" + taskWaitInQueueTime + " ]");
 // 初始線程數(shù)、核心線程數(shù)、正在執(zhí)行的任務(wù)數(shù)量、
 // 已完成任務(wù)數(shù)量、任務(wù)總數(shù)、隊(duì)列里緩存的任務(wù)數(shù)量、池中存在的最大線程數(shù)、
 // 最大允許的線程數(shù)、線程空閑時(shí)間、線程池是否關(guān)閉、線程池是否終止
 LOGGER.info("{}-pool-monitor: " +
 " PoolSize: {}, CorePoolSize: {}, Active: {}, " +
 "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
 "MaximumPoolSize: {},  KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
 this.poolName,
 this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
 this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
 this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
 }
 super.afterExecute(r, t);
 }
 /**
 * Spring容器管理線程池的生命周期,線程池Bean銷毀之前先關(guān)閉     * @throws Exception
 */
 @Override
 public void destroy() throws Exception {
 shutdown();
 }
 /**
 * 線程池延遲關(guān)閉時(shí)(等待線程池里的任務(wù)都執(zhí)行完畢),統(tǒng)計(jì)線程池情況
 */
 @Override
 public void shutdown() {
 // 統(tǒng)計(jì)已執(zhí)行任務(wù)、正在執(zhí)行任務(wù)、未執(zhí)行任務(wù)數(shù)量
 LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
 this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
 super.shutdown();
 }
 /**
 * 線程池立即關(guān)閉時(shí),統(tǒng)計(jì)線程池情況
 */
 @Override
 public List<Runnable> shutdownNow() {
 // 統(tǒng)計(jì)已執(zhí)行任務(wù)、正在執(zhí)行任務(wù)、未執(zhí)行任務(wù)數(shù)量
 LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
 this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
 return super.shutdownNow();
 }
 /**
 * 任務(wù)平均時(shí)長(zhǎng),無(wú)已完成任務(wù)時(shí),返回 0
 */
 public long getTaskAvgTime() {
 if (this.getCompletedTaskCount() > 0) {
 return this.getTaskTotalTime().get() / this.getCompletedTaskCount();
 }
 return 0;
 }
 /**
 * 任務(wù)平均執(zhí)行時(shí)長(zhǎng),無(wú)已完成任務(wù)時(shí),返回 0
 */
 public long getTaskAvgExecTime() {
 if (this.getCompletedTaskCount() > 0) {
 return this.getTaskTotalExecTime().get() / this.getCompletedTaskCount();
 }
 return 0;
 }
 //省略setter/getter方法
} 
public class TaskStatistics {
 /**
 * 任務(wù)提交時(shí)間
 */
 private long commitTime;
 /**
 * 任務(wù)開(kāi)始執(zhí)行時(shí)間
 */
 private long startExecTime;
 public TaskStatistics() {
 this.commitTime = System.currentTimeMillis();
 }
}
方法含義
shutdown()線程池延遲關(guān)閉時(shí)(等待線程池里的任務(wù)都執(zhí)行完畢),統(tǒng)計(jì)已執(zhí)行任務(wù)、正在執(zhí)行任務(wù)、未執(zhí)行任務(wù)數(shù)量
shutdownNow()線程池立即關(guān)閉時(shí),統(tǒng)計(jì)已執(zhí)行任務(wù)、正在執(zhí)行任務(wù)、未執(zhí)行任務(wù)數(shù)量
beforeExecute(Thread t, Runnable r)任務(wù)執(zhí)行之前,記錄任務(wù)開(kāi)始時(shí)間,startTimes這個(gè)HashMap以任務(wù)的hashCode為key,開(kāi)始時(shí)間為值
afterExecute(Runnable r, Throwable t)任務(wù)執(zhí)行之后,計(jì)算任務(wù)結(jié)束時(shí)間。統(tǒng)計(jì)任務(wù)耗時(shí)、初始線程數(shù)、核心線程數(shù)、正在執(zhí)行的任務(wù)數(shù)量、已完成任務(wù)數(shù)量、任務(wù)總數(shù)、隊(duì)列里緩存的任務(wù)數(shù)量、池中存在的最大線程數(shù)、最大允許的線程數(shù)、線程空閑時(shí)間、線程池是否關(guān)閉、線程池是否終止信息

注意事項(xiàng):

  • 在 afterExecute 方法中需要注意,需要調(diào)用 ConcurrentHashMap 的 remove 方法移除并返回任務(wù)的開(kāi)始時(shí)間信息,而不是調(diào)用 get 方法,因?yàn)樵诟卟l(fā)情況下,線程池里要執(zhí)行的任務(wù)很多,如果只獲取值不移除的話,會(huì)使 ConcurrentHashMap 越來(lái)越大,引發(fā)內(nèi)存泄漏或溢出問(wèn)題。

2.2 定時(shí)采集

public class ThreadPoolMonitor {
 private static final Map<String, FutureWrapper> POOL_TASK_FUTURE_MAP = new ConcurrentHashMap<>();
 private static final ScheduledThreadPoolExecutor SCHEDULE_THREAD_POOL = new ScheduledThreadPoolExecutor(8, new NamedThreadFactory("ThreadPoolMonitor"));
 private static final Long DEFAULT_MONITOR_PERIOD_TIME_MILLS = 1000L;
 public ThreadPoolMonitor() {
 }
 public static void monitor(String name, ThreadPoolExecutor threadPoolExecutor) {
 if (threadPoolExecutor instanceof MonitoredThreadPoolStatisticsExecutor) {
 throw new IllegalArgumentException("MonitoredThreadPoolStatisticsExecutor is already monitored.");
 } else {
 monitor0(name, threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);
 }
 }
 public static void remove(String name) {
 ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.remove(name);
 if (futureWrapper != null) {
 futureWrapper.future.cancel(false);
 }
 }
 public static void remove(String name, ThreadPoolExecutor threadPoolExecutor) {
 ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.get(name);
 if (futureWrapper != null && futureWrapper.threadPoolExecutor == threadPoolExecutor) {
 POOL_TASK_FUTURE_MAP.remove(name, futureWrapper);
 futureWrapper.future.cancel(false);
 }
 }
 static void monitor(MonitoredThreadPoolStatisticsExecutor threadPoolExecutor) {
 monitor0(threadPoolExecutor.poolName(), threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);
 }
 private static void monitor0(String name, ThreadPoolExecutor threadPoolExecutor, long monitorPeriodTimeMills) {
 PoolMonitorTask poolMonitorTask = new PoolMonitorTask(threadPoolExecutor, name);
 POOL_TASK_FUTURE_MAP.compute(name, (k, v) -> {
 if (v == null) {
 return new ThreadPoolMonitor.FutureWrapper(SCHEDULE_THREAD_POOL.scheduleWithFixedDelay(poolMonitorTask, 0L, monitorPeriodTimeMills, TimeUnit.MILLISECONDS), threadPoolExecutor);
 } else {
 throw new IllegalStateException("duplicate pool name: " + name);
 }
 });
 }
 static {
 Runtime.getRuntime().addShutdownHook(new Thread(ThreadPoolMonitor.SCHEDULE_THREAD_POOL::shutdown));
 }
 static class FutureWrapper {
 private final Future<?> future;
 private final ThreadPoolExecutor threadPoolExecutor;
 public FutureWrapper(Future<?> future, ThreadPoolExecutor threadPoolExecutor) {
 this.future = future;
 this.threadPoolExecutor = threadPoolExecutor;
 }
 }
}
public class PoolMonitorTask implements Runnable {
 private final ThreadPoolExecutor monitoredThreadPool;
 private final String poolName;
 private volatile long lastTaskCount = 0L;
 public PoolMonitorTask(ThreadPoolExecutor monitoredThreadPool, String poolName) {
 this.monitoredThreadPool = monitoredThreadPool;
 this.poolName = poolName;
 }
 @Override
 public void run() {
 int activeCount = this.monitoredThreadPool.getActiveCount();
 int corePoolSize = this.monitoredThreadPool.getCorePoolSize();
 int maximumPoolSize = this.monitoredThreadPool.getMaximumPoolSize();
 int queueTaskSize = this.monitoredThreadPool.getQueue().size();
 long taskCount = this.monitoredThreadPool.getTaskCount();
 int executedTask = (int) (taskCount - this.lastTaskCount);
 log.info("線程池名稱 = {}, 活躍線程數(shù)峰值 = {}, 隊(duì)列任務(wù)數(shù)峰值 = {}, 核心線程數(shù) = {}, 最大線程數(shù) = {}, 執(zhí)行的任務(wù)總數(shù) = {}",
 this.poolName, activeCount, queueTaskSize, corePoolSize, maximumPoolSize, executedTask);
 this.lastTaskCount = taskCount;
 if (this.monitoredThreadPool.isTerminated()) {
 ThreadPoolMonitor.remove(this.poolName, this.monitoredThreadPool);
 }
 }
}

2.3 可視化

通過(guò)Kafka獲取到監(jiān)控?cái)?shù)據(jù)后,可以做一個(gè)可視化頁(yè)面,比如可以展示下面這這些數(shù)據(jù)

  • active/coreSize :活動(dòng)線程數(shù)和核心線程數(shù)的比值, 其中active = executor.getActiveCount(),表示所有運(yùn)行中的工作線程的數(shù)量,這個(gè)比值反應(yīng)線程池的線程活躍狀態(tài),如果一直維持在一個(gè)很低的水平,則說(shuō)明線程池需要進(jìn)行縮容;如果長(zhǎng)時(shí)間維持一個(gè)很大的數(shù)值,說(shuō)明活躍度好,線程池利用率高。

  • active/maxSize :活動(dòng)線程數(shù)和最大線程數(shù)的比值,這個(gè)值可以配合上面的 active/coreSize 來(lái)看,當(dāng)active/coreSize大于100%的時(shí)候,如果active/maxSize維持在一個(gè)較低的值,則說(shuō)明當(dāng)前線程池的負(fù)載偏低,如果大于60%或者更高,則說(shuō)明線程池過(guò)載,需要及時(shí)調(diào)整線程池容量配置。

  • completedTaskCount:執(zhí)行完畢的工作線程的總數(shù),包含歷史所有。

  • largestPoolSize:歷史上線程池容量觸達(dá)過(guò)的最大值

  • rejectCount:被拒絕的線程的數(shù)量,如果大量線程被拒絕,則說(shuō)明當(dāng)前線程池已經(jīng)溢出了,需要及時(shí)調(diào)整線程池配置

  • queueSize:隊(duì)列中工作線程的數(shù)量,如果大量的線程池在排隊(duì),說(shuō)明coreSize已經(jīng)不夠用了,可以根據(jù)實(shí)際情況來(lái)調(diào)整,對(duì)于執(zhí)行時(shí)間要求很?chē)?yán)格的業(yè)務(wù)場(chǎng)景,可能需要通過(guò)提升coreSize來(lái)減少排隊(duì)情況。

3 動(dòng)態(tài)調(diào)整線程池

配置線程池的大小可根據(jù)以下幾個(gè)維度進(jìn)行分析來(lái)配置合理的線程數(shù):

任務(wù)性質(zhì)可分為:CPU密集型任務(wù),IO密集型任務(wù),混合型任務(wù),任務(wù)的執(zhí)行時(shí)長(zhǎng),任務(wù)是否有依賴——依賴其他系統(tǒng)資源,如數(shù)據(jù)庫(kù)連接等。

1、CPU密集型任務(wù) 盡量使用較小的線程池,一般為CPU核數(shù)+1。 因?yàn)镃PU密集型任務(wù)使得CPU使用率很高,若開(kāi)過(guò)多的線程數(shù),只能增加上下文切換的次數(shù),因此會(huì)帶來(lái)額外的開(kāi)銷。

2、IO密集型任務(wù) 可以使用稍大的線程池,一般為2*CPU核數(shù)+1。 因?yàn)镮O操作不占用CPU,不要讓CPU閑下來(lái),應(yīng)加大線程數(shù)量,因此可以讓CPU在等待IO的時(shí)候去處理別的任務(wù),充分利用CPU時(shí)間。

3、混合型任務(wù) 可以將任務(wù)分成IO密集型和CPU密集型任務(wù),然后分別用不同的線程池去處理。 只要分完之后兩個(gè)任務(wù)的執(zhí)行時(shí)間相差不大,那么就會(huì)比串行執(zhí)行來(lái)的高效。 因?yàn)槿绻麆澐种髢蓚€(gè)任務(wù)執(zhí)行時(shí)間相差甚遠(yuǎn),那么先執(zhí)行完的任務(wù)就要等后執(zhí)行完的任務(wù),最終的時(shí)間仍然取決于后執(zhí)行完的任務(wù),而且還要加上任務(wù)拆分與合并的開(kāi)銷,得不償失

4、依賴其他資源 如某個(gè)任務(wù)依賴數(shù)據(jù)庫(kù)的連接返回的結(jié)果,這時(shí)候等待的時(shí)間越長(zhǎng),則CPU空閑的時(shí)間越長(zhǎng),那么線程數(shù)量應(yīng)設(shè)置得越大,才能更好的利用CPU。

小結(jié):線程等待時(shí)間所占比例越高,需要越多線程。線程CPU時(shí)間所占比例越高,需要越少線程。

但是實(shí)踐發(fā)現(xiàn),盡管我們經(jīng)過(guò)謹(jǐn)慎的評(píng)估,仍然不能夠保證一次計(jì)算出來(lái)合適的參數(shù),那么我們是否可以將修改線程池參數(shù)的成本降下來(lái),這樣至少可以發(fā)生故障的時(shí)候可以快速調(diào)整從而縮短故障恢復(fù)的時(shí)間呢? 基于這個(gè)思考,我們是否可以將線程池的參數(shù)從代碼中遷移到分布式配置中心上,實(shí)現(xiàn)線程池參數(shù)可動(dòng)態(tài)配置和即時(shí)生效,線程池參數(shù)動(dòng)態(tài)化前后的參數(shù)修改流程對(duì)比如下:

3.1 修改參數(shù)

實(shí)際應(yīng)用中主要有下列參數(shù)可以支持動(dòng)態(tài)修改。

線程池參數(shù)說(shuō)明
corePoolSize核心線程數(shù)
maximumPoolSize最大線程數(shù)
queueCapacity等待隊(duì)列大小
timeout任務(wù)超時(shí)時(shí)間告警閾值
execTimeout任務(wù)執(zhí)行超時(shí)時(shí)間告警閾值
queuedTaskWarningSize等待隊(duì)列排隊(duì)數(shù)量告警閾值
checkInterval線程池定時(shí)監(jiān)控時(shí)間間隔
autoExtend是否自動(dòng)擴(kuò)容

其中的corePoolSize、maximumPoolSize都可以使用ThreadPoolExecutor提供的api實(shí)現(xiàn): public void setCorePoolSize(int corePoolSize) public void setMaximumPoolSize(int maximumPoolSize)

從ThreadPoolExecutor源碼中可知,

設(shè)置新的核心線程數(shù)時(shí), 如果設(shè)置的新值小于當(dāng)前值,多余的現(xiàn)有線程將在下一次空閑時(shí)終止,如果新設(shè)置的corePoolSize值更大,將在需要時(shí)啟動(dòng)新線程來(lái)執(zhí)行任何排隊(duì)的任務(wù);

設(shè)置新的最大線程數(shù)時(shí),如果新值小于當(dāng)前值,多余的現(xiàn)有線程將在下一次空閑時(shí)終止。

ThreadPoolExecutor沒(méi)有提供直接修改等待隊(duì)列大小的api。這就需要我們自定義一個(gè)可以修改容量的隊(duì)列。其實(shí)很簡(jiǎn)單,只要把jdk原生的隊(duì)列中的容量設(shè)置為可以修改,并提供修改方法即可。 比如把jdk中的LinkedBlockingQueue拷貝一份,命名為CapacityResizableLinkedBlockingQueue。 將其capacity的屬性變?yōu)榭勺兊?,并提供set方法:

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
//將上述原生代碼改為:
private volatile int capacity;
public void setCapacity(int capacity) { 
 this.capacity = capacity;
}

3.2 配置監(jiān)聽(tīng)

可以通過(guò)配置中心的動(dòng)態(tài)加載來(lái)處理,以Apollo為例,我們可以利用Apollo的ChangeListener來(lái)實(shí)現(xiàn)對(duì)配置變更的監(jiān)聽(tīng),(如果是MySQL,可以修改完配置后直接同過(guò)HTTP接口通知客戶端進(jìn)行配置刷新),代碼片段如下:

public class ThreadPoolConfigUpdateListener {
 @Value("${apollo.bootstrap.namespaces:application}")
 private String namespace;
 @Autowired
 private DynamicThreadPoolFacade dynamicThreadPoolManager;
 @Autowired
 private DynamicThreadPoolProperties poolProperties;
 @PostConstruct
 public void init() {
 initConfigUpdateListener();
 }
 public void initConfigUpdateListener() {
 String apolloNamespace = namespace;
 if (StringUtils.hasText(poolProperties.getApolloNamespace())) {
 apolloNamespace = poolProperties.getApolloNamespace();
 }
 String finalApolloNamespace = apolloNamespace;
 Config config = ConfigService.getConfig(finalApolloNamespace);
 config.addChangeListener(changeEvent -> {
 try {
 Thread.sleep(poolProperties.getWaitRefreshConfigSeconds() * 1000);
 } catch (InterruptedException e) {
 log.error("配置刷新異常",e);
 }
 dynamicThreadPoolManager.refreshThreadPoolExecutor();
 log.info("線程池配置有變化,刷新完成");
 });
 }
}

線程池配置的刷新的邏輯如下:

public void refreshThreadPoolExecutor(DynamicThreadPoolProperties dynamicThreadPoolProperties) {
 try {
 dynamicThreadPoolProperties.getExecutors().forEach(poolProperties -> {
 ThreadPoolMonitor threadPoolExecutor =ThreadPoolMonitor.POOL_TASK_FUTURE_MAP.get(poolProperties.getThreadPoolName()).getThreadPoolExecutor();
 executor.setCorePoolSize(poolProperties.getCorePoolSize());
 executor.setMaxPoolSize(poolProperties.getMaximumPoolSize());
 executor.setKeepAliveSeconds((int) poolProperties.getKeepAliveTime());
 });
 }catch(Exception e){
 log.error("Executor 參數(shù)設(shè)置異常",e);
 }
 }

3.4 后臺(tái)管理

當(dāng)然可以通過(guò)管理后臺(tái)來(lái)動(dòng)態(tài)修改,如下圖,參數(shù)可保存在mysql

最后,一個(gè)簡(jiǎn)單的線程池監(jiān)控以及動(dòng)態(tài)修改架構(gòu)如下圖:

4 線程池如何保持有序/串行

線程池當(dāng)然是無(wú)序的,但是如果我們需要像kafka那樣分區(qū)有序怎么辦呢?

思路:建立多個(gè)只有一個(gè)線程的線程池,然后按照不同key 在 不同線程池上執(zhí)行,這樣key就相當(dāng)于kafka的分區(qū),只要key相同,那么提交的任務(wù)就會(huì)有序執(zhí)行。

demo如下

類似的功能在git有大神開(kāi)源了,可以去參考一下 https://github.com/PhantomThief/more-lambdas-java

以上就是JAVA線程池監(jiān)控以及動(dòng)態(tài)調(diào)整示例詳解的詳細(xì)內(nèi)容,更多關(guān)于JAVA線程池監(jiān)控動(dòng)態(tài)調(diào)整的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 詳解IntelliJ IDEA 自帶的 HTTP Client 接口調(diào)用插件吊打 Postman

    詳解IntelliJ IDEA 自帶的 HTTP Client 接口調(diào)用插件吊打 Postman

    HTTP Client 是 IDEA 自帶的一款簡(jiǎn)潔輕量級(jí)的接口調(diào)用插件,通過(guò)它,我們能在 IDEA 上開(kāi)發(fā),調(diào)試,測(cè)試 RESTful Web 服務(wù),接下來(lái)通過(guò)本文給大家分享IntelliJ IDEA 自帶的 HTTP Client 接口調(diào)用插件吊打 Postman的知識(shí),感興趣的朋友一起看看吧
    2021-05-05
  • 詳解java中的6種單例寫(xiě)法及優(yōu)缺點(diǎn)

    詳解java中的6種單例寫(xiě)法及優(yōu)缺點(diǎn)

    在java中,單例有很多種寫(xiě)法,面試時(shí),手寫(xiě)代碼環(huán)節(jié),除了寫(xiě)算法題,有時(shí)候也會(huì)讓手寫(xiě)單例模式,這里記錄一下單例的幾種寫(xiě)法和優(yōu)缺點(diǎn)。需要的朋友可以參考下
    2018-11-11
  • springboot yml中profiles的巧妙用法(小白必看多環(huán)境配置)

    springboot yml中profiles的巧妙用法(小白必看多環(huán)境配置)

    這篇文章主要介紹了springboot yml中profiles的巧妙用法,非常適合多環(huán)境配置場(chǎng)景,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-04-04
  • Java實(shí)例講解文件上傳與跨域問(wèn)題

    Java實(shí)例講解文件上傳與跨域問(wèn)題

    這篇文章主要介紹了Java文件上傳與跨域問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-09-09
  • JPA添加Pageable實(shí)現(xiàn)翻頁(yè)時(shí)報(bào)錯(cuò)的問(wèn)題

    JPA添加Pageable實(shí)現(xiàn)翻頁(yè)時(shí)報(bào)錯(cuò)的問(wèn)題

    這篇文章主要介紹了解決JPA添加Pageable實(shí)現(xiàn)翻頁(yè)時(shí)報(bào)錯(cuò)的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • 詳解spring boot集成RabbitMQ

    詳解spring boot集成RabbitMQ

    RabbitMQ作為AMQP的代表性產(chǎn)品,在項(xiàng)目中大量使用。結(jié)合現(xiàn)在主流的spring boot,極大簡(jiǎn)化了開(kāi)發(fā)過(guò)程中所涉及到的消息通信問(wèn)題。
    2017-03-03
  • Springboot+Bootstrap實(shí)現(xiàn)增刪改查實(shí)戰(zhàn)

    Springboot+Bootstrap實(shí)現(xiàn)增刪改查實(shí)戰(zhàn)

    這篇文章主要介紹了Springboot+Bootstrap實(shí)現(xiàn)增刪改查實(shí)戰(zhàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12
  • mybatisPlus批量插入優(yōu)化加快性能

    mybatisPlus批量插入優(yōu)化加快性能

    這篇文章主要介紹了mybatisPlus批量插入優(yōu)化加快性能,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-12-12
  • Java基于二叉查找樹(shù)實(shí)現(xiàn)排序功能示例

    Java基于二叉查找樹(shù)實(shí)現(xiàn)排序功能示例

    這篇文章主要介紹了Java基于二叉查找樹(shù)實(shí)現(xiàn)排序功能,結(jié)合實(shí)例形式分析了Java二叉查找樹(shù)的定義、遍歷及排序等相關(guān)操作技巧,需要的朋友可以參考下
    2017-08-08
  • springboot 獲取工具類bean過(guò)程詳解

    springboot 獲取工具類bean過(guò)程詳解

    這篇文章主要介紹了springboot 獲取工具類bean過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-09-09

最新評(píng)論