ThreadPoolExecutor核心線程數(shù)和RocketMQ消費線程調(diào)整詳解
背景
Rocketmq 消費者在高峰期希望手動減少消費線程數(shù),通過DefaultMQPushConsumer.updateCorePoolSize方法可以調(diào)用內(nèi)部的ThreadPoolExecutor.setCorePoolSize設置多線程核心線程數(shù)。
那么是否能夠通過調(diào)整參數(shù)動態(tài)調(diào)整Rocketmq消費者呢。
結(jié)論
- 多線程ThreadPoolExecutor.setCorePoolSize可以修改核心線程數(shù),但是減少核心線程數(shù)不一定生效
- 核心線程銷毀的前提是至少在keepAliveTime內(nèi)沒有新的任務提交
動態(tài)調(diào)整消費線程實現(xiàn)方案
- 可以通過調(diào)整核心線程數(shù)減少RocketMQ 消費線程數(shù)
- 先掛起消費者consumer.suspend()
- 調(diào)用consumer.updateCorePoolSize更新核心線程數(shù)
- 然后休眠至少1分鐘以上,等任務全部消費完成,1分鐘是基于ConsumeMessageConcurrentlyService中創(chuàng)建線程池默認參數(shù)1000*60 TimeUnit.MILLISECONDS得到的, 還需要加上本地隊列堆積任務消費完成時間
- 恢復消費者consumer.resume()
consumer.suspend();
consumer.updateCorePoolSize(3);
try {
TimeUnit.SECONDS.sleep(65000L);
} catch (Exception e) {
log.error("InterruptException", e);
}
consumer.resume();- 增加消費線程數(shù),直接通過consumer.updateCorePoolSize方法就可以實現(xiàn)
測試
ThreadTest.java
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ThreadTest {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10,
50,
1000 * 60,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryImpl("test" + "_" + "ConsumeMessageThread_"));
for (int i = 0; i < 1000; i++) {
threadPoolExecutor.submit(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(5);
log.info("hello");
}
});
}
log.info("coreSize: {}" ,threadPoolExecutor.getCorePoolSize());
Thread.sleep(10000L);
threadPoolExecutor.setCorePoolSize(3);
log.info("coreSize: {}" ,threadPoolExecutor.getCorePoolSize());
// Thread.sleep(1000*60); // 如果休眠時間大于KeepAliveTime將會只有3個線程
Thread.sleep(1000L); // 休眠時間不夠時仍然有10個線程
for (int i = 0; i < 1000; i++) {
threadPoolExecutor.submit(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(10);
log.info("hello2");
}
});
}
}
}實驗證明setCorePoolSize在設置為3個線程以后,在第二批任務提交還是有10個線程在工作, 但是如果在第二批任務提交前休眠時間大于keepAliveTime以后則只會有3個工作線程
原理
源碼部分主要看是ThreadPoolExecutor中的workers變量,setCorePoolSize()方法,runWorker()方法,getTask()方法
- 一個work在執(zhí)行runWorker()方法時只有在獲取任務getTask()方法返回null以后才會終止循環(huán),然后銷毀
- getTask()方法從任務隊列中拿取任務等待keepAliveTime超時以后才會有可能返回null
// 工作workers, work只有在獲取任務超時以后才會從workers中刪除
private final HashSet<Worker> workers = new HashSet<Worker>();
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
// 減少核心線程數(shù)以后進入interruptIdleWorkers方法
interruptIdleWorkers();
else if (delta > 0) {
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
// 在interruptIdleWorkers方法中只是將work的線程中斷,并沒有從workers刪除
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 重點是getTask()方法獲取task失敗才會中斷循環(huán)
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 超時以后進入這里的if返回null然后work才會被銷毀
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 這里才真正將worker刪除
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}到此這篇關于ThreadPoolExecutor核心線程數(shù)和RocketMQ消費線程調(diào)整詳解的文章就介紹到這了,更多相關RocketMQ消費線程調(diào)整內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java多線程連續(xù)打印abc實現(xiàn)方法詳解
這篇文章主要介紹了Java多線程連續(xù)打印abc實現(xiàn)方法詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-03-03
JAVA LinkedList和ArrayList的使用及性能分析
JAVA LinkedList和ArrayList的使用及性能分析,這篇文章也是以JAVA List的總結(jié)。2013-11-11

