RocketMQ線程池創(chuàng)建實現原理詳解
前言
大家好,我是小郭,今天主要來和大家聊一聊RocketMQ中的線程池是如何創(chuàng)建的,如何設置線程池數量,同時也可以從中去學習到一些線程池的實踐和需要注意的一些細節(jié)。
RocketMQ在哪些地方使用到了線程池?
在RocketMQ中存在了大量的對線程池的使用,從消息的生產到投遞Broker中,到最后的消息消費每一個環(huán)節(jié)中都大量使用到線程池的地方,下面我們拿出幾個不同類型的線程池來看一看。
在 NameServer的路由注冊和剔除中,多次使用到了定時線程池
定時線程池
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "NSScheduledThread"));
// 定時任務 每10s掃描一次Broker,移除失活Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
?
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//定時任務,每隔30s向集群中所有NameServer發(fā)送心跳包
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
?
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
線程池newFixedThreadPool
FixedThreadPool常用于創(chuàng)建一個固定大小的線程池,
它的特點就是核心線程數量與最大線程數量一致,采用無界的阻塞隊列 LinkedBlockingQueue,并且沒有設置隊列的大小默認是Integer.MAX_VALUE,適用于負載較重的場景
private ExecutorService remotingExecutor;
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 用來設置接收到消息后的處理方法
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
消息發(fā)送初始化默認異步發(fā)送者線程池
核心線程數與最大線程數設置均為 Runtime.getRuntime().availableProcessors() ,可用的計算資源
阻塞隊列設置為一個初始化50000長度的阻塞隊列
keepAliveTime設置60s,超過則時間空閑的線程將被終止
private final ExecutorService defaultAsyncSenderExecutor;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
消費端拉取消息線程池
我們重點來看一下消費端的線程池是如何創(chuàng)建,它可以說是整個RocketMQ中最關鍵的一個線程池
為了提高消費速度,我們通常有兩種方式來提高消費并行度
- 同一個 ConsumerGroup 下,通過增加 Consumer 實例數量來提高并行度
- 提高單個 Consumer 的消費并行線程,通過修改參數 consumeThreadMin、consumeThreadMax實現。
如何創(chuàng)建?
在消息監(jiān)聽的時候,利用線程池進行不斷的拉取消息

提交消費請求,消息提交到內部的線程池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
參數設置
創(chuàng)建內部線程池,核心參數核心線程數和最大線程數,主要是根據配置來進行設置
設置線程池名稱以 ConsumeMessageThread_ 開頭的,利于排查問題
阻塞隊列是一個無界的阻塞隊列LinkedBlockingQueue
private final BlockingQueue<Runnable> consumeRequestQueue;
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl(consumeThreadPrefix));
通過RocketMQ的源碼,我們看到 consumeExecutor 線程池的創(chuàng)建也是非常簡單的
如果想要修改線程池參數,需要注意什么?
根據線程池的原理我們知道,只有阻塞隊列為滿的情況下,不會創(chuàng)建臨時線程

所以線程池內部持有的隊列為一個無界隊列,導致 consumeThreadMax 大于 consumeThreadMin,線程個數最大也只能 consumeThreadMin 個線程數量
什么時候需要修改?
在正常的業(yè)務場景中,啟動應用之后,我們就不會再修改消費者線程數,但有可能突發(fā)業(yè)務高峰導致消息堆積,這時候我們就需要調整單個 Consumer 的消費并行線程數。
如何修改線程數?
- 修改線程池后,重新啟動消費者,缺點是參數不易評估,隨著業(yè)務的并發(fā)提升,需要頻繁的重啟服務來更改線程數,這勢必會帶來一定的造成影響。
- 官方也為我們提供了修改線程數的方法,當更新的線程數大于0且小于 Short.MAX_VALUE 且小于最大線程數,則更新核心線程數。
JDK允許線程池使用方通過ThreadPoolExecutor的實例來動態(tài)設置線程池的核心策略
@Override
public void updateCorePoolSize(int corePoolSize) {
if (corePoolSize > 0
&& corePoolSize <= Short.MAX_VALUE
&& corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
this.consumeExecutor.setCorePoolSize(corePoolSize);
}
}
這兩種方式都存在一定的痛點
- 線程數量隨著業(yè)務的變動,需要修改代碼
- 在springBoot和SpringCloud Stream下,對線程池參數變更不是很友好
- 不能通過管理界面,直接動態(tài)修改線程池參數
針對上面的痛點問題,我們可以考慮封裝線程池動態(tài)參數調整,首先肯定原來代碼是毫無侵入性的,
同時通過管理頁面對不同消費者組的線程池進行管理自由的隨著業(yè)務波動進行平滑修改,降低線程池參數修改的成本。
以上就是RocketMQ線程池創(chuàng)建實現原理詳解的詳細內容,更多關于RocketMQ線程池創(chuàng)建的資料請關注腳本之家其它相關文章!
相關文章
SpringMVC?HttpMessageConverter報文信息轉換器
這篇文章主要為大家介紹了SpringMVC?HttpMessageConverter報文信息轉換器,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05
@CacheEvict中的allEntries與beforeInvocation的區(qū)別說明
這篇文章主要介紹了@CacheEvict中的allEntries與beforeInvocation的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
java基礎之初始化ArrayList時直接賦值的4種方式總結
ArrayList是Java中的一個類,它是Java集合框架中的一部分,用于實現動態(tài)數組,下面這篇文章主要給大家介紹了關于java基礎之初始化ArrayList時直接賦值的4種方式,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2024-07-07

