RocketMQ線程池創(chuàng)建實現(xiàn)原理詳解
前言
大家好,我是小郭,今天主要來和大家聊一聊RocketMQ中的線程池是如何創(chuàng)建的,如何設置線程池數(shù)量,同時也可以從中去學習到一些線程池的實踐和需要注意的一些細節(jié)。
RocketMQ在哪些地方使用到了線程池?
在RocketMQ中存在了大量的對線程池的使用,從消息的生產到投遞Broker中,到最后的消息消費每一個環(huán)節(jié)中都大量使用到線程池的地方,下面我們拿出幾個不同類型的線程池來看一看。
在 NameServer的路由注冊和剔除中,多次使用到了定時線程池
定時線程池
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "NSScheduledThread"));
// 定時任務 每10s掃描一次Broker,移除失活Broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { ? @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //定時任務,每隔30s向集群中所有NameServer發(fā)送心跳包 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { ? @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
線程池newFixedThreadPool
FixedThreadPool常用于創(chuàng)建一個固定大小的線程池,
它的特點就是核心線程數(shù)量與最大線程數(shù)量一致,采用無界的阻塞隊列 LinkedBlockingQueue,并且沒有設置隊列的大小默認是Integer.MAX_VALUE,適用于負載較重的場景
private ExecutorService remotingExecutor; this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); // 用來設置接收到消息后的處理方法 this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
消息發(fā)送初始化默認異步發(fā)送者線程池
核心線程數(shù)與最大線程數(shù)設置均為 Runtime.getRuntime().availableProcessors() ,可用的計算資源
阻塞隊列設置為一個初始化50000長度的阻塞隊列
keepAliveTime設置60s,超過則時間空閑的線程將被終止
private final ExecutorService defaultAsyncSenderExecutor; private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue; this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000); this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1000 * 60, TimeUnit.MILLISECONDS, this.asyncSenderThreadPoolQueue, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); } });
消費端拉取消息線程池
我們重點來看一下消費端的線程池是如何創(chuàng)建,它可以說是整個RocketMQ中最關鍵的一個線程池
為了提高消費速度,我們通常有兩種方式來提高消費并行度
- 同一個 ConsumerGroup 下,通過增加 Consumer 實例數(shù)量來提高并行度
- 提高單個 Consumer 的消費并行線程,通過修改參數(shù) consumeThreadMin、consumeThreadMax實現(xiàn)。
如何創(chuàng)建?
在消息監(jiān)聽的時候,利用線程池進行不斷的拉取消息
提交消費請求,消息提交到內部的線程池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
參數(shù)設置
創(chuàng)建內部線程池,核心參數(shù)核心線程數(shù)和最大線程數(shù),主要是根據(jù)配置來進行設置
設置線程池名稱以 ConsumeMessageThread_ 開頭的,利于排查問題
阻塞隊列是一個無界的阻塞隊列LinkedBlockingQueue
private final BlockingQueue<Runnable> consumeRequestQueue; this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl(consumeThreadPrefix));
通過RocketMQ的源碼,我們看到 consumeExecutor 線程池的創(chuàng)建也是非常簡單的
如果想要修改線程池參數(shù),需要注意什么?
根據(jù)線程池的原理我們知道,只有阻塞隊列為滿的情況下,不會創(chuàng)建臨時線程
所以線程池內部持有的隊列為一個無界隊列,導致 consumeThreadMax 大于 consumeThreadMin,線程個數(shù)最大也只能 consumeThreadMin 個線程數(shù)量
什么時候需要修改?
在正常的業(yè)務場景中,啟動應用之后,我們就不會再修改消費者線程數(shù),但有可能突發(fā)業(yè)務高峰導致消息堆積,這時候我們就需要調整單個 Consumer 的消費并行線程數(shù)。
如何修改線程數(shù)?
- 修改線程池后,重新啟動消費者,缺點是參數(shù)不易評估,隨著業(yè)務的并發(fā)提升,需要頻繁的重啟服務來更改線程數(shù),這勢必會帶來一定的造成影響。
- 官方也為我們提供了修改線程數(shù)的方法,當更新的線程數(shù)大于0且小于 Short.MAX_VALUE 且小于最大線程數(shù),則更新核心線程數(shù)。
JDK允許線程池使用方通過ThreadPoolExecutor的實例來動態(tài)設置線程池的核心策略
@Override public void updateCorePoolSize(int corePoolSize) { if (corePoolSize > 0 && corePoolSize <= Short.MAX_VALUE && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { this.consumeExecutor.setCorePoolSize(corePoolSize); } }
這兩種方式都存在一定的痛點
- 線程數(shù)量隨著業(yè)務的變動,需要修改代碼
- 在springBoot和SpringCloud Stream下,對線程池參數(shù)變更不是很友好
- 不能通過管理界面,直接動態(tài)修改線程池參數(shù)
針對上面的痛點問題,我們可以考慮封裝線程池動態(tài)參數(shù)調整,首先肯定原來代碼是毫無侵入性的,
同時通過管理頁面對不同消費者組的線程池進行管理自由的隨著業(yè)務波動進行平滑修改,降低線程池參數(shù)修改的成本。
以上就是RocketMQ線程池創(chuàng)建實現(xiàn)原理詳解的詳細內容,更多關于RocketMQ線程池創(chuàng)建的資料請關注腳本之家其它相關文章!
相關文章
SpringMVC?HttpMessageConverter報文信息轉換器
這篇文章主要為大家介紹了SpringMVC?HttpMessageConverter報文信息轉換器,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05SpringSecurityOAuth2實現(xiàn)微信授權登錄
微信的登錄功能是用戶注冊和使用微信的必經(jīng)之路之一,而微信授權登錄更是方便了用戶的登錄操作,本文主要介紹了SpringSecurityOAuth2實現(xiàn)微信授權登錄,感興趣的可以了解一下2023-09-09@CacheEvict中的allEntries與beforeInvocation的區(qū)別說明
這篇文章主要介紹了@CacheEvict中的allEntries與beforeInvocation的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12java基礎之初始化ArrayList時直接賦值的4種方式總結
ArrayList是Java中的一個類,它是Java集合框架中的一部分,用于實現(xiàn)動態(tài)數(shù)組,下面這篇文章主要給大家介紹了關于java基礎之初始化ArrayList時直接賦值的4種方式,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2024-07-07