RocketMQ線程池創(chuàng)建實(shí)現(xiàn)原理詳解
前言
大家好,我是小郭,今天主要來和大家聊一聊RocketMQ中的線程池是如何創(chuàng)建的,如何設(shè)置線程池?cái)?shù)量,同時(shí)也可以從中去學(xué)習(xí)到一些線程池的實(shí)踐和需要注意的一些細(xì)節(jié)。
RocketMQ在哪些地方使用到了線程池?
在RocketMQ中存在了大量的對(duì)線程池的使用,從消息的生產(chǎn)到投遞Broker中,到最后的消息消費(fèi)每一個(gè)環(huán)節(jié)中都大量使用到線程池的地方,下面我們拿出幾個(gè)不同類型的線程池來看一看。
在 NameServer的路由注冊和剔除中,多次使用到了定時(shí)線程池
定時(shí)線程池
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "NSScheduledThread"));
// 定時(shí)任務(wù) 每10s掃描一次Broker,移除失活Broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { ? @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //定時(shí)任務(wù),每隔30s向集群中所有NameServer發(fā)送心跳包 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { ? @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
線程池newFixedThreadPool
FixedThreadPool常用于創(chuàng)建一個(gè)固定大小的線程池,
它的特點(diǎn)就是核心線程數(shù)量與最大線程數(shù)量一致,采用無界的阻塞隊(duì)列 LinkedBlockingQueue,并且沒有設(shè)置隊(duì)列的大小默認(rèn)是Integer.MAX_VALUE,適用于負(fù)載較重的場景
private ExecutorService remotingExecutor; this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); // 用來設(shè)置接收到消息后的處理方法 this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
消息發(fā)送初始化默認(rèn)異步發(fā)送者線程池
核心線程數(shù)與最大線程數(shù)設(shè)置均為 Runtime.getRuntime().availableProcessors() ,可用的計(jì)算資源
阻塞隊(duì)列設(shè)置為一個(gè)初始化50000長度的阻塞隊(duì)列
keepAliveTime設(shè)置60s,超過則時(shí)間空閑的線程將被終止
private final ExecutorService defaultAsyncSenderExecutor; private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue; this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000); this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1000 * 60, TimeUnit.MILLISECONDS, this.asyncSenderThreadPoolQueue, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); } });
消費(fèi)端拉取消息線程池
我們重點(diǎn)來看一下消費(fèi)端的線程池是如何創(chuàng)建,它可以說是整個(gè)RocketMQ中最關(guān)鍵的一個(gè)線程池
為了提高消費(fèi)速度,我們通常有兩種方式來提高消費(fèi)并行度
- 同一個(gè) ConsumerGroup 下,通過增加 Consumer 實(shí)例數(shù)量來提高并行度
- 提高單個(gè) Consumer 的消費(fèi)并行線程,通過修改參數(shù) consumeThreadMin、consumeThreadMax實(shí)現(xiàn)。
如何創(chuàng)建?
在消息監(jiān)聽的時(shí)候,利用線程池進(jìn)行不斷的拉取消息
提交消費(fèi)請(qǐng)求,消息提交到內(nèi)部的線程池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
參數(shù)設(shè)置
創(chuàng)建內(nèi)部線程池,核心參數(shù)核心線程數(shù)和最大線程數(shù),主要是根據(jù)配置來進(jìn)行設(shè)置
設(shè)置線程池名稱以 ConsumeMessageThread_ 開頭的,利于排查問題
阻塞隊(duì)列是一個(gè)無界的阻塞隊(duì)列LinkedBlockingQueue
private final BlockingQueue<Runnable> consumeRequestQueue; this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl(consumeThreadPrefix));
通過RocketMQ的源碼,我們看到 consumeExecutor 線程池的創(chuàng)建也是非常簡單的
如果想要修改線程池參數(shù),需要注意什么?
根據(jù)線程池的原理我們知道,只有阻塞隊(duì)列為滿的情況下,不會(huì)創(chuàng)建臨時(shí)線程
所以線程池內(nèi)部持有的隊(duì)列為一個(gè)無界隊(duì)列,導(dǎo)致 consumeThreadMax 大于 consumeThreadMin,線程個(gè)數(shù)最大也只能 consumeThreadMin 個(gè)線程數(shù)量
什么時(shí)候需要修改?
在正常的業(yè)務(wù)場景中,啟動(dòng)應(yīng)用之后,我們就不會(huì)再修改消費(fèi)者線程數(shù),但有可能突發(fā)業(yè)務(wù)高峰導(dǎo)致消息堆積,這時(shí)候我們就需要調(diào)整單個(gè) Consumer 的消費(fèi)并行線程數(shù)。
如何修改線程數(shù)?
- 修改線程池后,重新啟動(dòng)消費(fèi)者,缺點(diǎn)是參數(shù)不易評(píng)估,隨著業(yè)務(wù)的并發(fā)提升,需要頻繁的重啟服務(wù)來更改線程數(shù),這勢必會(huì)帶來一定的造成影響。
- 官方也為我們提供了修改線程數(shù)的方法,當(dāng)更新的線程數(shù)大于0且小于 Short.MAX_VALUE 且小于最大線程數(shù),則更新核心線程數(shù)。
JDK允許線程池使用方通過ThreadPoolExecutor的實(shí)例來動(dòng)態(tài)設(shè)置線程池的核心策略
@Override public void updateCorePoolSize(int corePoolSize) { if (corePoolSize > 0 && corePoolSize <= Short.MAX_VALUE && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { this.consumeExecutor.setCorePoolSize(corePoolSize); } }
這兩種方式都存在一定的痛點(diǎn)
- 線程數(shù)量隨著業(yè)務(wù)的變動(dòng),需要修改代碼
- 在springBoot和SpringCloud Stream下,對(duì)線程池參數(shù)變更不是很友好
- 不能通過管理界面,直接動(dòng)態(tài)修改線程池參數(shù)
針對(duì)上面的痛點(diǎn)問題,我們可以考慮封裝線程池動(dòng)態(tài)參數(shù)調(diào)整,首先肯定原來代碼是毫無侵入性的,
同時(shí)通過管理頁面對(duì)不同消費(fèi)者組的線程池進(jìn)行管理自由的隨著業(yè)務(wù)波動(dòng)進(jìn)行平滑修改,降低線程池參數(shù)修改的成本。
以上就是RocketMQ線程池創(chuàng)建實(shí)現(xiàn)原理詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ線程池創(chuàng)建的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringMVC?HttpMessageConverter報(bào)文信息轉(zhuǎn)換器
這篇文章主要為大家介紹了SpringMVC?HttpMessageConverter報(bào)文信息轉(zhuǎn)換器,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05SpringSecurityOAuth2實(shí)現(xiàn)微信授權(quán)登錄
微信的登錄功能是用戶注冊和使用微信的必經(jīng)之路之一,而微信授權(quán)登錄更是方便了用戶的登錄操作,本文主要介紹了SpringSecurityOAuth2實(shí)現(xiàn)微信授權(quán)登錄,感興趣的可以了解一下2023-09-09Java forEach對(duì)原數(shù)組的操作過程
forEach對(duì)于基本數(shù)據(jù)類型,是直接賦值,對(duì)于引用數(shù)據(jù)類型,是引用地址值,forEach遍歷時(shí),是創(chuàng)建的臨時(shí)變量,引用的數(shù)據(jù)地址,本文給大家介紹Java forEach對(duì)原數(shù)組的操作過程,感興趣的朋友一起看看吧2024-02-02java實(shí)現(xiàn)的順時(shí)針/逆時(shí)針打印矩陣操作示例
這篇文章主要介紹了java實(shí)現(xiàn)的順時(shí)針/逆時(shí)針打印矩陣操作,涉及java基于數(shù)組的矩陣存儲(chǔ)、遍歷、打印輸出等相關(guān)操作技巧,需要的朋友可以參考下2019-12-12@CacheEvict中的allEntries與beforeInvocation的區(qū)別說明
這篇文章主要介紹了@CacheEvict中的allEntries與beforeInvocation的區(qū)別說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12java基礎(chǔ)之初始化ArrayList時(shí)直接賦值的4種方式總結(jié)
ArrayList是Java中的一個(gè)類,它是Java集合框架中的一部分,用于實(shí)現(xiàn)動(dòng)態(tài)數(shù)組,下面這篇文章主要給大家介紹了關(guān)于java基礎(chǔ)之初始化ArrayList時(shí)直接賦值的4種方式,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-07-07