欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

ThreadPoolExecutor核心線程數(shù)和RocketMQ消費(fèi)線程調(diào)整詳解

 更新時(shí)間:2023年10月11日 10:25:51   作者:項(xiàng)哥  
這篇文章主要介紹了ThreadPoolExecutor核心線程數(shù)和RocketMQ消費(fèi)線程調(diào)整詳解,Rocketmq 消費(fèi)者在高峰期希望手動(dòng)減少消費(fèi)線程數(shù),通過(guò)DefaultMQPushConsumer.updateCorePoolSize方法可以調(diào)用內(nèi)部的setCorePoolSize設(shè)置多線程核心線程數(shù),需要的朋友可以參考下

背景

Rocketmq 消費(fèi)者在高峰期希望手動(dòng)減少消費(fèi)線程數(shù),通過(guò)DefaultMQPushConsumer.updateCorePoolSize方法可以調(diào)用內(nèi)部的ThreadPoolExecutor.setCorePoolSize設(shè)置多線程核心線程數(shù)。

那么是否能夠通過(guò)調(diào)整參數(shù)動(dòng)態(tài)調(diào)整Rocketmq消費(fèi)者呢。

結(jié)論

  • 多線程ThreadPoolExecutor.setCorePoolSize可以修改核心線程數(shù),但是減少核心線程數(shù)不一定生效
  • 核心線程銷(xiāo)毀的前提是至少在keepAliveTime內(nèi)沒(méi)有新的任務(wù)提交

動(dòng)態(tài)調(diào)整消費(fèi)線程實(shí)現(xiàn)方案

  • 可以通過(guò)調(diào)整核心線程數(shù)減少RocketMQ 消費(fèi)線程數(shù)
    • 先掛起消費(fèi)者consumer.suspend()
    • 調(diào)用consumer.updateCorePoolSize更新核心線程數(shù)
    • 然后休眠至少1分鐘以上,等任務(wù)全部消費(fèi)完成,1分鐘是基于ConsumeMessageConcurrentlyService中創(chuàng)建線程池默認(rèn)參數(shù)1000*60 TimeUnit.MILLISECONDS得到的, 還需要加上本地隊(duì)列堆積任務(wù)消費(fèi)完成時(shí)間
    • 恢復(fù)消費(fèi)者consumer.resume()
consumer.suspend();
consumer.updateCorePoolSize(3);
try {
	TimeUnit.SECONDS.sleep(65000L);
 } catch (Exception e) {
	log.error("InterruptException", e);
}
consumer.resume();
  • 增加消費(fèi)線程數(shù),直接通過(guò)consumer.updateCorePoolSize方法就可以實(shí)現(xiàn)

測(cè)試

ThreadTest.java

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ThreadTest {
   public static void main(String[] args) throws InterruptedException {
       ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
               10,
               50,
               1000 * 60,
               TimeUnit.MILLISECONDS,
               new LinkedBlockingQueue<>(),
               new ThreadFactoryImpl("test" + "_" + "ConsumeMessageThread_"));
       for (int i = 0; i < 1000; i++) {
           threadPoolExecutor.submit(new Runnable() {
               @SneakyThrows
               @Override
               public void run() {
                   Thread.sleep(5);
                   log.info("hello");
               }
           });
       }
       log.info("coreSize: {}" ,threadPoolExecutor.getCorePoolSize());
       Thread.sleep(10000L);
       threadPoolExecutor.setCorePoolSize(3);
       log.info("coreSize: {}" ,threadPoolExecutor.getCorePoolSize());
       // Thread.sleep(1000*60); // 如果休眠時(shí)間大于KeepAliveTime將會(huì)只有3個(gè)線程
       Thread.sleep(1000L);  // 休眠時(shí)間不夠時(shí)仍然有10個(gè)線程
       for (int i = 0; i < 1000; i++) {
           threadPoolExecutor.submit(new Runnable() {
               @SneakyThrows
               @Override
               public void run() {
                   Thread.sleep(10);
                   log.info("hello2");
               }
           });
       }
   }
}

實(shí)驗(yàn)證明setCorePoolSize在設(shè)置為3個(gè)線程以后,在第二批任務(wù)提交還是有10個(gè)線程在工作, 但是如果在第二批任務(wù)提交前休眠時(shí)間大于keepAliveTime以后則只會(huì)有3個(gè)工作線程

原理

源碼部分主要看是ThreadPoolExecutor中的workers變量,setCorePoolSize()方法,runWorker()方法,getTask()方法

  • 一個(gè)work在執(zhí)行runWorker()方法時(shí)只有在獲取任務(wù)getTask()方法返回null以后才會(huì)終止循環(huán),然后銷(xiāo)毀
  • getTask()方法從任務(wù)隊(duì)列中拿取任務(wù)等待keepAliveTime超時(shí)以后才會(huì)有可能返回null
    // 工作workers, work只有在獲取任務(wù)超時(shí)以后才會(huì)從workers中刪除
    private final HashSet<Worker> workers = new HashSet<Worker>();
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
        // 減少核心線程數(shù)以后進(jìn)入interruptIdleWorkers方法
            interruptIdleWorkers();
        else if (delta > 0) {
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }
     private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        // 在interruptIdleWorkers方法中只是將work的線程中斷,并沒(méi)有從workers刪除
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
   final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           // 重點(diǎn)是getTask()方法獲取task失敗才會(huì)中斷循環(huán)
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
   private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 超時(shí)以后進(jìn)入這里的if返回null然后work才會(huì)被銷(xiāo)毀
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
   private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 這里才真正將worker刪除
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

到此這篇關(guān)于ThreadPoolExecutor核心線程數(shù)和RocketMQ消費(fèi)線程調(diào)整詳解的文章就介紹到這了,更多相關(guān)RocketMQ消費(fèi)線程調(diào)整內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 接口簽名怎么用Java實(shí)現(xiàn)

    接口簽名怎么用Java實(shí)現(xiàn)

    今天帶大家學(xué)習(xí)java的相關(guān)知識(shí),文章圍繞怎么用Java實(shí)現(xiàn)接口簽名展開(kāi),文中有非常詳細(xì)的代碼示例及介紹,需要的朋友可以參考下
    2021-06-06
  • SpringMVC全局異常處理的三種方式

    SpringMVC全局異常處理的三種方式

    這篇文章主要介紹了SpringMVC全局異常處理的三種方式,幫助大家更好的理解和使用springmvc,感興趣的朋友可以了解下
    2021-01-01
  • Java多線程連續(xù)打印abc實(shí)現(xiàn)方法詳解

    Java多線程連續(xù)打印abc實(shí)現(xiàn)方法詳解

    這篇文章主要介紹了Java多線程連續(xù)打印abc實(shí)現(xiàn)方法詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-03-03
  • 解讀nacos獲取配置文件的大致過(guò)程

    解讀nacos獲取配置文件的大致過(guò)程

    這篇文章主要介紹了nacos獲取配置文件的大致過(guò)程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-07-07
  • java 線程池的實(shí)現(xiàn)方法

    java 線程池的實(shí)現(xiàn)方法

    在本篇文章里小編給大家整理了關(guān)于java 線程池的實(shí)現(xiàn)方法,有興趣的朋友們可以學(xué)習(xí)參考下。
    2020-02-02
  • Spring中的循環(huán)依賴問(wèn)題

    Spring中的循環(huán)依賴問(wèn)題

    在Spring框架中,循環(huán)依賴是指兩個(gè)或多個(gè)Bean相互依賴,這導(dǎo)致在Bean的創(chuàng)建過(guò)程中出現(xiàn)依賴死鎖,為了解決這一問(wèn)題,Spring引入了三級(jí)緩存機(jī)制,包括singletonObjects、earlySingletonObjects和singletonFactories
    2024-09-09
  • IntelliJ IDEA 如何徹底刪除項(xiàng)目的步驟

    IntelliJ IDEA 如何徹底刪除項(xiàng)目的步驟

    本篇文章主要介紹了IntelliJ IDEA 如何徹底刪除項(xiàng)目的步驟,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-11-11
  • JAVA LinkedList和ArrayList的使用及性能分析

    JAVA LinkedList和ArrayList的使用及性能分析

    JAVA LinkedList和ArrayList的使用及性能分析,這篇文章也是以JAVA List的總結(jié)。
    2013-11-11
  • Java中常用的四種引用類(lèi)型詳解

    Java中常用的四種引用類(lèi)型詳解

    Java中常用的四種引用類(lèi)型,分別為,強(qiáng)引用、軟引用、弱引用以及虛引用,這篇文章主要為大家介紹了這四種引用的用法,需要的可以參考一下
    2023-06-06
  • Java?Selenide?簡(jiǎn)介與用法

    Java?Selenide?簡(jiǎn)介與用法

    Selenium?是目前用的最廣泛的Web?UI?自動(dòng)化測(cè)試框架,本文給大家介紹下Java?Selenide使用,感興趣的朋友一起看看吧
    2022-01-01

最新評(píng)論