欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

JAVA線程池監(jiān)控以及動態(tài)調(diào)整示例詳解

 更新時間:2023年09月21日 09:28:26   作者:雪飄千里  
這篇文章主要為大家介紹了JAVA線程池監(jiān)控以及動態(tài)調(diào)整示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

1 背景

Java線程池源碼分析 里雖然介紹了線程池的核心配置(核心線程數(shù)、最大線程數(shù)和隊列大小)該如何配置,但是實際上業(yè)界也沒有一個統(tǒng)一的標準。雖然有些所謂的"公式",但是不同的業(yè)務場景復雜多變,配置原則也不盡相同。從實際經(jīng)驗來看,IO密集型、CPU密集型應用在線程配置上就比較懸殊,因此沒有一個通用的適合所有場景的公式。

那么我們換一種思路,就是既然不能明確配置,那么能不能支持動態(tài)配置呢?答案是肯定的,因為線程池本身就支持核心線程數(shù)和最大線程數(shù)的修改,而且是實時生效的。 通常在生產(chǎn)環(huán)境中,我們可以實時監(jiān)控線程池的運行狀態(tài),隨時掌握應用服務的性能狀況,以便在系統(tǒng)資源緊張時及時告警,動態(tài)調(diào)整線程配置,必要時進行人工介入,排查問題,線上修復。

也就是說,通過實時監(jiān)控,然后動態(tài)修改。

2 監(jiān)控

我們知道,線程池使用不當也會使服務器資源枯竭,導致異常情況的發(fā)生,比如固定線程池的阻塞隊列任務數(shù)量過多、緩存線程池創(chuàng)建的線程過多導致內(nèi)存溢出、系統(tǒng)假死等問題。因此,我們需要一種簡單的監(jiān)控方案來監(jiān)控線程池的使用情況,比如完成任務數(shù)量、未完成任務數(shù)量、線程大小等信息。

線程池的監(jiān)控分為2種類型,一種是在執(zhí)行任務前后全量統(tǒng)計任務排隊時間和執(zhí)行時間,另外一種是通過定時任務,定時獲取活躍線程數(shù),隊列中的任務數(shù),核心線程數(shù),最大線程數(shù)等數(shù)據(jù)。

2.1 MonitoredThreadPoolStatisticsExecutor全量統(tǒng)計

參數(shù)名稱說明
poolName線程池的名稱
timeout預設的任務超時時間閾值
taskTimeoutFlag是否記錄任務超時次數(shù)
execTimeout任務執(zhí)行超時時間閾值
taskExecTimeoutFlag是否記錄任務執(zhí)行超時次數(shù)
waitInQueueTimeout任務在隊列中等待的時間閾值
taskWaitInQueueTimeoutFlag是否記錄任務等待時間超時次數(shù)
queueSizeWarningPercent任務隊列使用率告警閾值
queueSizeWarningFlag是否進行隊列容量告警
queueSizeHasWarningFlag是否需要隊列容量告警(隊列是否曾經(jīng)達到過預警值)
taskTotalTime任務總時長,以任務提交時間進行計時,單位 ms
taskTotalExecTime任務總執(zhí)行時長,以任務開始執(zhí)行進行計時,單位 ms
minTaskTime最短任務時長,以提交時間計時,單位 ms
maxTaskTime最長任務時長,以提交時間計時,單位 ms
taskTimeoutCount任務超時次數(shù),以任務提交進行計時
taskExecTimeoutCount任務執(zhí)行超時次數(shù),以任務開始執(zhí)行時間進行計時
taskWaitInQueueTimeoutCount任務等待時間超過設定的閾值的次數(shù)
minTaskExecTime最短任務時長,以執(zhí)行時間計時,單位 ms
maxTaskExecTime最長任務時長,以執(zhí)行時間計時,單位 ms
activeCount線程池中正在執(zhí)行任務的線程數(shù)量
completedTaskCount線程池已完成的任務數(shù)量,該值小于等于taskCount
corePoolSize線程池的核心線程數(shù)量
largestPoolSize線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個數(shù)據(jù)可以知道線程池是否滿過,也就是達到了maximumPoolSize
maximumPoolSize線程池的最大線程數(shù)量
poolSize線程池當前的線程數(shù)量
taskCount線程池已經(jīng)執(zhí)行的和未執(zhí)行的任務總數(shù)

為了簡化,代碼中的監(jiān)控數(shù)據(jù)都是通過日志打印,實際中是通過kafka收集,然后做出可視化監(jiān)控。

/**
 * 自定義可監(jiān)控的線程池
 */
 public class MonitoredThreadPoolStatisticsExecutor extends ThreadPoolExecutor implements DisposableBean {
 /**
 * 線程池的名稱
 */
 private String poolName;
 /**
 * 預設的任務超時時間閾值,用于統(tǒng)計功能。     * 以任務提交時間進行計時,單位 ms,大于0則記錄超時次數(shù)。
 */
 private long timeout = 120000l;
 /**
 * 是否記錄任務超時次數(shù)
 */
 private boolean taskTimeoutFlag = false;
 /**
 * 任務執(zhí)行超時時間閾值,用于統(tǒng)計功能。     * 以任務開始執(zhí)行進行計時,單位 ms,大于 0 則記錄任務執(zhí)行超時次數(shù)。
 */
 private long execTimeout = 120000l;
 /**
 * 是否記錄任務執(zhí)行超時次數(shù)
 */
 private boolean taskExecTimeoutFlag = false;
 /**
 * 任務在隊列中等待的時間閾值,用于統(tǒng)計功能。     * 以任務提交時間開始計時到開始執(zhí)行為止,單位 ms。
 */
 private long waitInQueueTimeout = 60000l;
 /**
 * 是否記錄任務等待時間超時次數(shù)
 */
 private boolean taskWaitInQueueTimeoutFlag = false;
 /**
 * 任務隊列使用率告警閾值
 */
 private int queueSizeWarningPercent = 80;
 /**
 * 是否進行隊列容量告警
 */
 private boolean queueSizeWarningFlag = false;
 /**
 * 是否需要隊列容量告警(隊列是否曾經(jīng)達到過預警值)
 */
 private AtomicBoolean queueSizeHasWarningFlag = new AtomicBoolean(false);
 /**
 * 任務總時長,用于統(tǒng)計功能。以任務提交時間進行計時,單位 ms
 */
 private AtomicLong taskTotalTime = new AtomicLong(0);
 /**
 * 任務總執(zhí)行時長,用于統(tǒng)計功能。以任務開始執(zhí)行進行計時,單位 ms
 */
 private AtomicLong taskTotalExecTime = new AtomicLong(0);
 /**
 * 最短任務時長,以提交時間計時,單位 ms
 */
 private long minTaskTime = Long.MAX_VALUE;
 /**
 * 最長任務時長,以提交時間計時,單位 ms
 */
 private long maxTaskTime = 0;
 /**
 * 任務超時次數(shù),以任務提交進行計時
 */
 private AtomicLong taskTimeoutCount = new AtomicLong(0);
 /**
 * 任務執(zhí)行超時次數(shù),以任務開始執(zhí)行時間進行計時
 */
 private AtomicLong taskExecTimeoutCount = new AtomicLong(0);
 /**
 * 任務等待時間超過設定的閾值的次數(shù)
 */
 private AtomicLong taskWaitInQueueTimeoutCount = new AtomicLong(0);
 /**
 * 最短任務時長,以執(zhí)行時間計時,單位 ms
 */
 private long minTaskExecTime = Long.MAX_VALUE;
 /**
 * 最長任務時長,以執(zhí)行時間計時,單位 ms
 */
 private long maxTaskExecTime = 0;
 /**
 * 保存任務信息
 */
 private Map<String, TaskStatistics> taskInfoMap = new ConcurrentHashMap<String, TaskStatistics>();
 public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) {
 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName), handler);
 this.poolName = poolName;
 this.timeout = timeout;
 this.execTimeout = execTimeout;
 this.waitInQueueTimeout = waitInQueueTimeout;
 this.queueSizeWarningPercent = queueSizeWarningPercent;
 if (this.timeout > 0) {
 this.taskTimeoutFlag = true;
 }
 if (this.execTimeout > 0) {
 this.taskExecTimeoutFlag = true;
 }
 if (this.waitInQueueTimeout > 0) {
 this.taskWaitInQueueTimeoutFlag = true;
 }
 if (this.queueSizeWarningPercent > 0) {
 this.queueSizeWarningFlag = true;
 }
 ThreadPoolMonitor.monitor(this);
 }
 public MonitoredThreadPoolStatisticsExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName, long timeout, long execTimeout, long waitInQueueTimeout, int queueSizeWarningPercent) {
 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName));
 this.poolName = poolName;
 this.timeout = timeout;
 this.execTimeout = execTimeout;
 this.waitInQueueTimeout = waitInQueueTimeout;
 this.queueSizeWarningPercent = queueSizeWarningPercent;
 if (this.timeout > 0) {
 this.taskTimeoutFlag = true;
 }
 if (this.execTimeout > 0) {
 this.taskExecTimeoutFlag = true;
 }
 if (this.waitInQueueTimeout > 0) {
 this.taskWaitInQueueTimeoutFlag = true;
 }
 if (this.queueSizeWarningPercent > 0) {
 this.queueSizeWarningFlag = true;
 }
 ThreadPoolMonitor.monitor(this);
 }
 @Override
 public void execute(Runnable command) {
 this.taskInfoMap.put(String.valueOf(command.hashCode()), new TaskStatistics());
 if (this.queueSizeWarningFlag) {
 float f = (float) getQueue().size() / (getQueue().size() + getQueue().remainingCapacity());
 BigDecimal bd = new BigDecimal(f).setScale(2, BigDecimal.ROUND_HALF_UP);
 int usedPercent = bd.multiply(new BigDecimal(100)).intValue();
 if (usedPercent > this.queueSizeWarningPercent) {
 this.queueSizeHasWarningFlag.set(true);
 System.out.println("queueSize percent Warning!used:" + usedPercent + "%,qSize:" + getQueue().size() + ",remaining:" + getQueue().remainingCapacity());
 }
 }
 super.execute(command);
 }
 @Override
 protected void beforeExecute(Thread t, Runnable r) {
 TaskStatistics taskStatistics = this.taskInfoMap.get(String.valueOf(r.hashCode()));
 if (null != taskStatistics) {
 taskStatistics.setStartExecTime(System.currentTimeMillis());
 }
 super.beforeExecute(t, r);
 }
 @Override
 protected void afterExecute(Runnable r, Throwable t) {
 //重寫此方法做一些統(tǒng)計功能
 long endTime = System.currentTimeMillis();
 TaskStatistics taskStatistics = this.taskInfoMap.remove(String.valueOf(r.hashCode()));
 if (null != taskStatistics) {
 long taskTotalTime = endTime - taskStatistics.getCommitTime();
 long taskExecTime = endTime - taskStatistics.getStartExecTime();
 long taskWaitInQueueTime = taskStatistics.getStartExecTime() - taskStatistics.getCommitTime();
 this.taskTotalTime.addAndGet(taskTotalTime);
 this.taskTotalExecTime.addAndGet(taskExecTime);
 if (this.minTaskTime > taskTotalTime) {
 this.minTaskTime = taskTotalTime;
 }
 if (this.maxTaskTime < taskTotalTime) {
 this.maxTaskTime = taskTotalTime;
 }
 if (this.taskTimeoutFlag && taskTotalTime > this.timeout) {
 this.taskTimeoutCount.incrementAndGet();
 }
 if (this.minTaskExecTime > taskExecTime) {
 this.minTaskExecTime = taskExecTime;
 }
 if (this.maxTaskExecTime < taskExecTime) {
 this.maxTaskExecTime = taskExecTime;
 }
 if (this.taskExecTimeoutFlag && taskExecTime > this.execTimeout) {
 this.taskExecTimeoutCount.incrementAndGet();
 }
 if (this.taskWaitInQueueTimeoutFlag && taskWaitInQueueTime > this.waitInQueueTimeout) {
 this.taskWaitInQueueTimeoutCount.incrementAndGet();
 }
 System.out.println("task cost info[ taskTotalTime:" + taskTotalTime + ",taskExecTime:" + taskExecTime + ",taskWaitInQueueTime:" + taskWaitInQueueTime + " ]");
 // 初始線程數(shù)、核心線程數(shù)、正在執(zhí)行的任務數(shù)量、
 // 已完成任務數(shù)量、任務總數(shù)、隊列里緩存的任務數(shù)量、池中存在的最大線程數(shù)、
 // 最大允許的線程數(shù)、線程空閑時間、線程池是否關閉、線程池是否終止
 LOGGER.info("{}-pool-monitor: " +
 " PoolSize: {}, CorePoolSize: {}, Active: {}, " +
 "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
 "MaximumPoolSize: {},  KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
 this.poolName,
 this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
 this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
 this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
 }
 super.afterExecute(r, t);
 }
 /**
 * Spring容器管理線程池的生命周期,線程池Bean銷毀之前先關閉     * @throws Exception
 */
 @Override
 public void destroy() throws Exception {
 shutdown();
 }
 /**
 * 線程池延遲關閉時(等待線程池里的任務都執(zhí)行完畢),統(tǒng)計線程池情況
 */
 @Override
 public void shutdown() {
 // 統(tǒng)計已執(zhí)行任務、正在執(zhí)行任務、未執(zhí)行任務數(shù)量
 LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
 this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
 super.shutdown();
 }
 /**
 * 線程池立即關閉時,統(tǒng)計線程池情況
 */
 @Override
 public List<Runnable> shutdownNow() {
 // 統(tǒng)計已執(zhí)行任務、正在執(zhí)行任務、未執(zhí)行任務數(shù)量
 LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
 this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
 return super.shutdownNow();
 }
 /**
 * 任務平均時長,無已完成任務時,返回 0
 */
 public long getTaskAvgTime() {
 if (this.getCompletedTaskCount() > 0) {
 return this.getTaskTotalTime().get() / this.getCompletedTaskCount();
 }
 return 0;
 }
 /**
 * 任務平均執(zhí)行時長,無已完成任務時,返回 0
 */
 public long getTaskAvgExecTime() {
 if (this.getCompletedTaskCount() > 0) {
 return this.getTaskTotalExecTime().get() / this.getCompletedTaskCount();
 }
 return 0;
 }
 //省略setter/getter方法
} 
public class TaskStatistics {
 /**
 * 任務提交時間
 */
 private long commitTime;
 /**
 * 任務開始執(zhí)行時間
 */
 private long startExecTime;
 public TaskStatistics() {
 this.commitTime = System.currentTimeMillis();
 }
}
方法含義
shutdown()線程池延遲關閉時(等待線程池里的任務都執(zhí)行完畢),統(tǒng)計已執(zhí)行任務、正在執(zhí)行任務、未執(zhí)行任務數(shù)量
shutdownNow()線程池立即關閉時,統(tǒng)計已執(zhí)行任務、正在執(zhí)行任務、未執(zhí)行任務數(shù)量
beforeExecute(Thread t, Runnable r)任務執(zhí)行之前,記錄任務開始時間,startTimes這個HashMap以任務的hashCode為key,開始時間為值
afterExecute(Runnable r, Throwable t)任務執(zhí)行之后,計算任務結束時間。統(tǒng)計任務耗時、初始線程數(shù)、核心線程數(shù)、正在執(zhí)行的任務數(shù)量、已完成任務數(shù)量、任務總數(shù)、隊列里緩存的任務數(shù)量、池中存在的最大線程數(shù)、最大允許的線程數(shù)、線程空閑時間、線程池是否關閉、線程池是否終止信息

注意事項:

  • 在 afterExecute 方法中需要注意,需要調(diào)用 ConcurrentHashMap 的 remove 方法移除并返回任務的開始時間信息,而不是調(diào)用 get 方法,因為在高并發(fā)情況下,線程池里要執(zhí)行的任務很多,如果只獲取值不移除的話,會使 ConcurrentHashMap 越來越大,引發(fā)內(nèi)存泄漏或溢出問題。

2.2 定時采集

public class ThreadPoolMonitor {
 private static final Map<String, FutureWrapper> POOL_TASK_FUTURE_MAP = new ConcurrentHashMap<>();
 private static final ScheduledThreadPoolExecutor SCHEDULE_THREAD_POOL = new ScheduledThreadPoolExecutor(8, new NamedThreadFactory("ThreadPoolMonitor"));
 private static final Long DEFAULT_MONITOR_PERIOD_TIME_MILLS = 1000L;
 public ThreadPoolMonitor() {
 }
 public static void monitor(String name, ThreadPoolExecutor threadPoolExecutor) {
 if (threadPoolExecutor instanceof MonitoredThreadPoolStatisticsExecutor) {
 throw new IllegalArgumentException("MonitoredThreadPoolStatisticsExecutor is already monitored.");
 } else {
 monitor0(name, threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);
 }
 }
 public static void remove(String name) {
 ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.remove(name);
 if (futureWrapper != null) {
 futureWrapper.future.cancel(false);
 }
 }
 public static void remove(String name, ThreadPoolExecutor threadPoolExecutor) {
 ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.get(name);
 if (futureWrapper != null && futureWrapper.threadPoolExecutor == threadPoolExecutor) {
 POOL_TASK_FUTURE_MAP.remove(name, futureWrapper);
 futureWrapper.future.cancel(false);
 }
 }
 static void monitor(MonitoredThreadPoolStatisticsExecutor threadPoolExecutor) {
 monitor0(threadPoolExecutor.poolName(), threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);
 }
 private static void monitor0(String name, ThreadPoolExecutor threadPoolExecutor, long monitorPeriodTimeMills) {
 PoolMonitorTask poolMonitorTask = new PoolMonitorTask(threadPoolExecutor, name);
 POOL_TASK_FUTURE_MAP.compute(name, (k, v) -> {
 if (v == null) {
 return new ThreadPoolMonitor.FutureWrapper(SCHEDULE_THREAD_POOL.scheduleWithFixedDelay(poolMonitorTask, 0L, monitorPeriodTimeMills, TimeUnit.MILLISECONDS), threadPoolExecutor);
 } else {
 throw new IllegalStateException("duplicate pool name: " + name);
 }
 });
 }
 static {
 Runtime.getRuntime().addShutdownHook(new Thread(ThreadPoolMonitor.SCHEDULE_THREAD_POOL::shutdown));
 }
 static class FutureWrapper {
 private final Future<?> future;
 private final ThreadPoolExecutor threadPoolExecutor;
 public FutureWrapper(Future<?> future, ThreadPoolExecutor threadPoolExecutor) {
 this.future = future;
 this.threadPoolExecutor = threadPoolExecutor;
 }
 }
}
public class PoolMonitorTask implements Runnable {
 private final ThreadPoolExecutor monitoredThreadPool;
 private final String poolName;
 private volatile long lastTaskCount = 0L;
 public PoolMonitorTask(ThreadPoolExecutor monitoredThreadPool, String poolName) {
 this.monitoredThreadPool = monitoredThreadPool;
 this.poolName = poolName;
 }
 @Override
 public void run() {
 int activeCount = this.monitoredThreadPool.getActiveCount();
 int corePoolSize = this.monitoredThreadPool.getCorePoolSize();
 int maximumPoolSize = this.monitoredThreadPool.getMaximumPoolSize();
 int queueTaskSize = this.monitoredThreadPool.getQueue().size();
 long taskCount = this.monitoredThreadPool.getTaskCount();
 int executedTask = (int) (taskCount - this.lastTaskCount);
 log.info("線程池名稱 = {}, 活躍線程數(shù)峰值 = {}, 隊列任務數(shù)峰值 = {}, 核心線程數(shù) = {}, 最大線程數(shù) = {}, 執(zhí)行的任務總數(shù) = {}",
 this.poolName, activeCount, queueTaskSize, corePoolSize, maximumPoolSize, executedTask);
 this.lastTaskCount = taskCount;
 if (this.monitoredThreadPool.isTerminated()) {
 ThreadPoolMonitor.remove(this.poolName, this.monitoredThreadPool);
 }
 }
}

2.3 可視化

通過Kafka獲取到監(jiān)控數(shù)據(jù)后,可以做一個可視化頁面,比如可以展示下面這這些數(shù)據(jù)

  • active/coreSize :活動線程數(shù)和核心線程數(shù)的比值, 其中active = executor.getActiveCount(),表示所有運行中的工作線程的數(shù)量,這個比值反應線程池的線程活躍狀態(tài),如果一直維持在一個很低的水平,則說明線程池需要進行縮容;如果長時間維持一個很大的數(shù)值,說明活躍度好,線程池利用率高。

  • active/maxSize :活動線程數(shù)和最大線程數(shù)的比值,這個值可以配合上面的 active/coreSize 來看,當active/coreSize大于100%的時候,如果active/maxSize維持在一個較低的值,則說明當前線程池的負載偏低,如果大于60%或者更高,則說明線程池過載,需要及時調(diào)整線程池容量配置。

  • completedTaskCount:執(zhí)行完畢的工作線程的總數(shù),包含歷史所有。

  • largestPoolSize:歷史上線程池容量觸達過的最大值

  • rejectCount:被拒絕的線程的數(shù)量,如果大量線程被拒絕,則說明當前線程池已經(jīng)溢出了,需要及時調(diào)整線程池配置

  • queueSize:隊列中工作線程的數(shù)量,如果大量的線程池在排隊,說明coreSize已經(jīng)不夠用了,可以根據(jù)實際情況來調(diào)整,對于執(zhí)行時間要求很嚴格的業(yè)務場景,可能需要通過提升coreSize來減少排隊情況。

3 動態(tài)調(diào)整線程池

配置線程池的大小可根據(jù)以下幾個維度進行分析來配置合理的線程數(shù):

任務性質可分為:CPU密集型任務,IO密集型任務,混合型任務,任務的執(zhí)行時長,任務是否有依賴——依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接等。

1、CPU密集型任務 盡量使用較小的線程池,一般為CPU核數(shù)+1。 因為CPU密集型任務使得CPU使用率很高,若開過多的線程數(shù),只能增加上下文切換的次數(shù),因此會帶來額外的開銷。

2、IO密集型任務 可以使用稍大的線程池,一般為2*CPU核數(shù)+1。 因為IO操作不占用CPU,不要讓CPU閑下來,應加大線程數(shù)量,因此可以讓CPU在等待IO的時候去處理別的任務,充分利用CPU時間。

3、混合型任務 可以將任務分成IO密集型和CPU密集型任務,然后分別用不同的線程池去處理。 只要分完之后兩個任務的執(zhí)行時間相差不大,那么就會比串行執(zhí)行來的高效。 因為如果劃分之后兩個任務執(zhí)行時間相差甚遠,那么先執(zhí)行完的任務就要等后執(zhí)行完的任務,最終的時間仍然取決于后執(zhí)行完的任務,而且還要加上任務拆分與合并的開銷,得不償失

4、依賴其他資源 如某個任務依賴數(shù)據(jù)庫的連接返回的結果,這時候等待的時間越長,則CPU空閑的時間越長,那么線程數(shù)量應設置得越大,才能更好的利用CPU。

小結:線程等待時間所占比例越高,需要越多線程。線程CPU時間所占比例越高,需要越少線程。

但是實踐發(fā)現(xiàn),盡管我們經(jīng)過謹慎的評估,仍然不能夠保證一次計算出來合適的參數(shù),那么我們是否可以將修改線程池參數(shù)的成本降下來,這樣至少可以發(fā)生故障的時候可以快速調(diào)整從而縮短故障恢復的時間呢? 基于這個思考,我們是否可以將線程池的參數(shù)從代碼中遷移到分布式配置中心上,實現(xiàn)線程池參數(shù)可動態(tài)配置和即時生效,線程池參數(shù)動態(tài)化前后的參數(shù)修改流程對比如下:

3.1 修改參數(shù)

實際應用中主要有下列參數(shù)可以支持動態(tài)修改。

線程池參數(shù)說明
corePoolSize核心線程數(shù)
maximumPoolSize最大線程數(shù)
queueCapacity等待隊列大小
timeout任務超時時間告警閾值
execTimeout任務執(zhí)行超時時間告警閾值
queuedTaskWarningSize等待隊列排隊數(shù)量告警閾值
checkInterval線程池定時監(jiān)控時間間隔
autoExtend是否自動擴容

其中的corePoolSize、maximumPoolSize都可以使用ThreadPoolExecutor提供的api實現(xiàn): public void setCorePoolSize(int corePoolSize) public void setMaximumPoolSize(int maximumPoolSize)

從ThreadPoolExecutor源碼中可知,

設置新的核心線程數(shù)時, 如果設置的新值小于當前值,多余的現(xiàn)有線程將在下一次空閑時終止,如果新設置的corePoolSize值更大,將在需要時啟動新線程來執(zhí)行任何排隊的任務;

設置新的最大線程數(shù)時,如果新值小于當前值,多余的現(xiàn)有線程將在下一次空閑時終止。

ThreadPoolExecutor沒有提供直接修改等待隊列大小的api。這就需要我們自定義一個可以修改容量的隊列。其實很簡單,只要把jdk原生的隊列中的容量設置為可以修改,并提供修改方法即可。 比如把jdk中的LinkedBlockingQueue拷貝一份,命名為CapacityResizableLinkedBlockingQueue。 將其capacity的屬性變?yōu)榭勺兊?,并提供set方法:

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
//將上述原生代碼改為:
private volatile int capacity;
public void setCapacity(int capacity) { 
 this.capacity = capacity;
}

3.2 配置監(jiān)聽

可以通過配置中心的動態(tài)加載來處理,以Apollo為例,我們可以利用Apollo的ChangeListener來實現(xiàn)對配置變更的監(jiān)聽,(如果是MySQL,可以修改完配置后直接同過HTTP接口通知客戶端進行配置刷新),代碼片段如下:

public class ThreadPoolConfigUpdateListener {
 @Value("${apollo.bootstrap.namespaces:application}")
 private String namespace;
 @Autowired
 private DynamicThreadPoolFacade dynamicThreadPoolManager;
 @Autowired
 private DynamicThreadPoolProperties poolProperties;
 @PostConstruct
 public void init() {
 initConfigUpdateListener();
 }
 public void initConfigUpdateListener() {
 String apolloNamespace = namespace;
 if (StringUtils.hasText(poolProperties.getApolloNamespace())) {
 apolloNamespace = poolProperties.getApolloNamespace();
 }
 String finalApolloNamespace = apolloNamespace;
 Config config = ConfigService.getConfig(finalApolloNamespace);
 config.addChangeListener(changeEvent -> {
 try {
 Thread.sleep(poolProperties.getWaitRefreshConfigSeconds() * 1000);
 } catch (InterruptedException e) {
 log.error("配置刷新異常",e);
 }
 dynamicThreadPoolManager.refreshThreadPoolExecutor();
 log.info("線程池配置有變化,刷新完成");
 });
 }
}

線程池配置的刷新的邏輯如下:

public void refreshThreadPoolExecutor(DynamicThreadPoolProperties dynamicThreadPoolProperties) {
 try {
 dynamicThreadPoolProperties.getExecutors().forEach(poolProperties -> {
 ThreadPoolMonitor threadPoolExecutor =ThreadPoolMonitor.POOL_TASK_FUTURE_MAP.get(poolProperties.getThreadPoolName()).getThreadPoolExecutor();
 executor.setCorePoolSize(poolProperties.getCorePoolSize());
 executor.setMaxPoolSize(poolProperties.getMaximumPoolSize());
 executor.setKeepAliveSeconds((int) poolProperties.getKeepAliveTime());
 });
 }catch(Exception e){
 log.error("Executor 參數(shù)設置異常",e);
 }
 }

3.4 后臺管理

當然可以通過管理后臺來動態(tài)修改,如下圖,參數(shù)可保存在mysql

最后,一個簡單的線程池監(jiān)控以及動態(tài)修改架構如下圖:

4 線程池如何保持有序/串行

線程池當然是無序的,但是如果我們需要像kafka那樣分區(qū)有序怎么辦呢?

思路:建立多個只有一個線程的線程池,然后按照不同key 在 不同線程池上執(zhí)行,這樣key就相當于kafka的分區(qū),只要key相同,那么提交的任務就會有序執(zhí)行。

demo如下

類似的功能在git有大神開源了,可以去參考一下 https://github.com/PhantomThief/more-lambdas-java

以上就是JAVA線程池監(jiān)控以及動態(tài)調(diào)整示例詳解的詳細內(nèi)容,更多關于JAVA線程池監(jiān)控動態(tài)調(diào)整的資料請關注腳本之家其它相關文章!

相關文章

  • 詳解IntelliJ IDEA 自帶的 HTTP Client 接口調(diào)用插件吊打 Postman

    詳解IntelliJ IDEA 自帶的 HTTP Client 接口調(diào)用插件吊打 Postman

    HTTP Client 是 IDEA 自帶的一款簡潔輕量級的接口調(diào)用插件,通過它,我們能在 IDEA 上開發(fā),調(diào)試,測試 RESTful Web 服務,接下來通過本文給大家分享IntelliJ IDEA 自帶的 HTTP Client 接口調(diào)用插件吊打 Postman的知識,感興趣的朋友一起看看吧
    2021-05-05
  • 詳解java中的6種單例寫法及優(yōu)缺點

    詳解java中的6種單例寫法及優(yōu)缺點

    在java中,單例有很多種寫法,面試時,手寫代碼環(huán)節(jié),除了寫算法題,有時候也會讓手寫單例模式,這里記錄一下單例的幾種寫法和優(yōu)缺點。需要的朋友可以參考下
    2018-11-11
  • springboot yml中profiles的巧妙用法(小白必看多環(huán)境配置)

    springboot yml中profiles的巧妙用法(小白必看多環(huán)境配置)

    這篇文章主要介紹了springboot yml中profiles的巧妙用法,非常適合多環(huán)境配置場景,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-04-04
  • Java實例講解文件上傳與跨域問題

    Java實例講解文件上傳與跨域問題

    這篇文章主要介紹了Java文件上傳與跨域問題,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-09-09
  • JPA添加Pageable實現(xiàn)翻頁時報錯的問題

    JPA添加Pageable實現(xiàn)翻頁時報錯的問題

    這篇文章主要介紹了解決JPA添加Pageable實現(xiàn)翻頁時報錯的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • 詳解spring boot集成RabbitMQ

    詳解spring boot集成RabbitMQ

    RabbitMQ作為AMQP的代表性產(chǎn)品,在項目中大量使用。結合現(xiàn)在主流的spring boot,極大簡化了開發(fā)過程中所涉及到的消息通信問題。
    2017-03-03
  • Springboot+Bootstrap實現(xiàn)增刪改查實戰(zhàn)

    Springboot+Bootstrap實現(xiàn)增刪改查實戰(zhàn)

    這篇文章主要介紹了Springboot+Bootstrap實現(xiàn)增刪改查實戰(zhàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-12-12
  • mybatisPlus批量插入優(yōu)化加快性能

    mybatisPlus批量插入優(yōu)化加快性能

    這篇文章主要介紹了mybatisPlus批量插入優(yōu)化加快性能,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-12-12
  • Java基于二叉查找樹實現(xiàn)排序功能示例

    Java基于二叉查找樹實現(xiàn)排序功能示例

    這篇文章主要介紹了Java基于二叉查找樹實現(xiàn)排序功能,結合實例形式分析了Java二叉查找樹的定義、遍歷及排序等相關操作技巧,需要的朋友可以參考下
    2017-08-08
  • springboot 獲取工具類bean過程詳解

    springboot 獲取工具類bean過程詳解

    這篇文章主要介紹了springboot 獲取工具類bean過程詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-09-09

最新評論