欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

ThreadPoolExecutor核心線程數(shù)和RocketMQ消費線程調(diào)整詳解

 更新時間:2023年10月11日 10:25:51   作者:項哥  
這篇文章主要介紹了ThreadPoolExecutor核心線程數(shù)和RocketMQ消費線程調(diào)整詳解,Rocketmq 消費者在高峰期希望手動減少消費線程數(shù),通過DefaultMQPushConsumer.updateCorePoolSize方法可以調(diào)用內(nèi)部的setCorePoolSize設置多線程核心線程數(shù),需要的朋友可以參考下

背景

Rocketmq 消費者在高峰期希望手動減少消費線程數(shù),通過DefaultMQPushConsumer.updateCorePoolSize方法可以調(diào)用內(nèi)部的ThreadPoolExecutor.setCorePoolSize設置多線程核心線程數(shù)。

那么是否能夠通過調(diào)整參數(shù)動態(tài)調(diào)整Rocketmq消費者呢。

結(jié)論

  • 多線程ThreadPoolExecutor.setCorePoolSize可以修改核心線程數(shù),但是減少核心線程數(shù)不一定生效
  • 核心線程銷毀的前提是至少在keepAliveTime內(nèi)沒有新的任務提交

動態(tài)調(diào)整消費線程實現(xiàn)方案

  • 可以通過調(diào)整核心線程數(shù)減少RocketMQ 消費線程數(shù)
    • 先掛起消費者consumer.suspend()
    • 調(diào)用consumer.updateCorePoolSize更新核心線程數(shù)
    • 然后休眠至少1分鐘以上,等任務全部消費完成,1分鐘是基于ConsumeMessageConcurrentlyService中創(chuàng)建線程池默認參數(shù)1000*60 TimeUnit.MILLISECONDS得到的, 還需要加上本地隊列堆積任務消費完成時間
    • 恢復消費者consumer.resume()
consumer.suspend();
consumer.updateCorePoolSize(3);
try {
	TimeUnit.SECONDS.sleep(65000L);
 } catch (Exception e) {
	log.error("InterruptException", e);
}
consumer.resume();
  • 增加消費線程數(shù),直接通過consumer.updateCorePoolSize方法就可以實現(xiàn)

測試

ThreadTest.java

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ThreadTest {
   public static void main(String[] args) throws InterruptedException {
       ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
               10,
               50,
               1000 * 60,
               TimeUnit.MILLISECONDS,
               new LinkedBlockingQueue<>(),
               new ThreadFactoryImpl("test" + "_" + "ConsumeMessageThread_"));
       for (int i = 0; i < 1000; i++) {
           threadPoolExecutor.submit(new Runnable() {
               @SneakyThrows
               @Override
               public void run() {
                   Thread.sleep(5);
                   log.info("hello");
               }
           });
       }
       log.info("coreSize: {}" ,threadPoolExecutor.getCorePoolSize());
       Thread.sleep(10000L);
       threadPoolExecutor.setCorePoolSize(3);
       log.info("coreSize: {}" ,threadPoolExecutor.getCorePoolSize());
       // Thread.sleep(1000*60); // 如果休眠時間大于KeepAliveTime將會只有3個線程
       Thread.sleep(1000L);  // 休眠時間不夠時仍然有10個線程
       for (int i = 0; i < 1000; i++) {
           threadPoolExecutor.submit(new Runnable() {
               @SneakyThrows
               @Override
               public void run() {
                   Thread.sleep(10);
                   log.info("hello2");
               }
           });
       }
   }
}

實驗證明setCorePoolSize在設置為3個線程以后,在第二批任務提交還是有10個線程在工作, 但是如果在第二批任務提交前休眠時間大于keepAliveTime以后則只會有3個工作線程

原理

源碼部分主要看是ThreadPoolExecutor中的workers變量,setCorePoolSize()方法,runWorker()方法,getTask()方法

  • 一個work在執(zhí)行runWorker()方法時只有在獲取任務getTask()方法返回null以后才會終止循環(huán),然后銷毀
  • getTask()方法從任務隊列中拿取任務等待keepAliveTime超時以后才會有可能返回null
    // 工作workers, work只有在獲取任務超時以后才會從workers中刪除
    private final HashSet<Worker> workers = new HashSet<Worker>();
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
        // 減少核心線程數(shù)以后進入interruptIdleWorkers方法
            interruptIdleWorkers();
        else if (delta > 0) {
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }
     private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        // 在interruptIdleWorkers方法中只是將work的線程中斷,并沒有從workers刪除
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
   final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           // 重點是getTask()方法獲取task失敗才會中斷循環(huán)
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
   private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 超時以后進入這里的if返回null然后work才會被銷毀
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
   private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 這里才真正將worker刪除
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

到此這篇關(guān)于ThreadPoolExecutor核心線程數(shù)和RocketMQ消費線程調(diào)整詳解的文章就介紹到這了,更多相關(guān)RocketMQ消費線程調(diào)整內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 接口簽名怎么用Java實現(xiàn)

    接口簽名怎么用Java實現(xiàn)

    今天帶大家學習java的相關(guān)知識,文章圍繞怎么用Java實現(xiàn)接口簽名展開,文中有非常詳細的代碼示例及介紹,需要的朋友可以參考下
    2021-06-06
  • SpringMVC全局異常處理的三種方式

    SpringMVC全局異常處理的三種方式

    這篇文章主要介紹了SpringMVC全局異常處理的三種方式,幫助大家更好的理解和使用springmvc,感興趣的朋友可以了解下
    2021-01-01
  • Java多線程連續(xù)打印abc實現(xiàn)方法詳解

    Java多線程連續(xù)打印abc實現(xiàn)方法詳解

    這篇文章主要介紹了Java多線程連續(xù)打印abc實現(xiàn)方法詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-03-03
  • 解讀nacos獲取配置文件的大致過程

    解讀nacos獲取配置文件的大致過程

    這篇文章主要介紹了nacos獲取配置文件的大致過程,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-07-07
  • java 線程池的實現(xiàn)方法

    java 線程池的實現(xiàn)方法

    在本篇文章里小編給大家整理了關(guān)于java 線程池的實現(xiàn)方法,有興趣的朋友們可以學習參考下。
    2020-02-02
  • Spring中的循環(huán)依賴問題

    Spring中的循環(huán)依賴問題

    在Spring框架中,循環(huán)依賴是指兩個或多個Bean相互依賴,這導致在Bean的創(chuàng)建過程中出現(xiàn)依賴死鎖,為了解決這一問題,Spring引入了三級緩存機制,包括singletonObjects、earlySingletonObjects和singletonFactories
    2024-09-09
  • IntelliJ IDEA 如何徹底刪除項目的步驟

    IntelliJ IDEA 如何徹底刪除項目的步驟

    本篇文章主要介紹了IntelliJ IDEA 如何徹底刪除項目的步驟,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-11-11
  • JAVA LinkedList和ArrayList的使用及性能分析

    JAVA LinkedList和ArrayList的使用及性能分析

    JAVA LinkedList和ArrayList的使用及性能分析,這篇文章也是以JAVA List的總結(jié)。
    2013-11-11
  • Java中常用的四種引用類型詳解

    Java中常用的四種引用類型詳解

    Java中常用的四種引用類型,分別為,強引用、軟引用、弱引用以及虛引用,這篇文章主要為大家介紹了這四種引用的用法,需要的可以參考一下
    2023-06-06
  • Java?Selenide?簡介與用法

    Java?Selenide?簡介與用法

    Selenium?是目前用的最廣泛的Web?UI?自動化測試框架,本文給大家介紹下Java?Selenide使用,感興趣的朋友一起看看吧
    2022-01-01

最新評論