欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ線程池創(chuàng)建實現(xiàn)原理詳解

 更新時間:2022年12月15日 08:50:41   作者:小郭的技術筆記  
這篇文章主要為大家介紹了RocketMQ線程池創(chuàng)建實現(xiàn)原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

前言

大家好,我是小郭,今天主要來和大家聊一聊RocketMQ中的線程池是如何創(chuàng)建的,如何設置線程池數(shù)量,同時也可以從中去學習到一些線程池的實踐和需要注意的一些細節(jié)。

RocketMQ在哪些地方使用到了線程池?

在RocketMQ中存在了大量的對線程池的使用,從消息的生產到投遞Broker中,到最后的消息消費每一個環(huán)節(jié)中都大量使用到線程池的地方,下面我們拿出幾個不同類型的線程池來看一看。

在 NameServer的路由注冊和剔除中,多次使用到了定時線程池

定時線程池

private final ScheduledExecutorService scheduledExecutorService =
	Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
		"NSScheduledThread"));
// 定時任務 每10s掃描一次Broker,移除失活Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
	?
	@Override
	public void run() {
		NamesrvController.this.routeInfoManager.scanNotActiveBroker();
	}
}, 5, 10, TimeUnit.SECONDS);
//定時任務,每隔30s向集群中所有NameServer發(fā)送心跳包
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
	?
	@Override
	public void run() {
		try {
			BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
		} catch (Throwable e) {
			log.error("registerBrokerAll Exception", e);
		}
	}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

線程池newFixedThreadPool

FixedThreadPool常用于創(chuàng)建一個固定大小的線程池,

它的特點就是核心線程數(shù)量與最大線程數(shù)量一致,采用無界的阻塞隊列 LinkedBlockingQueue,并且沒有設置隊列的大小默認是Integer.MAX_VALUE,適用于負載較重的場景

private ExecutorService remotingExecutor;
this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 用來設置接收到消息后的處理方法
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);

消息發(fā)送初始化默認異步發(fā)送者線程池

核心線程數(shù)與最大線程數(shù)設置均為 Runtime.getRuntime().availableProcessors() ,可用的計算資源

阻塞隊列設置為一個初始化50000長度的阻塞隊列

keepAliveTime設置60s,超過則時間空閑的線程將被終止

private final ExecutorService defaultAsyncSenderExecutor;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
	Runtime.getRuntime().availableProcessors(),
	Runtime.getRuntime().availableProcessors(),
	1000 * 60,
	TimeUnit.MILLISECONDS,
	this.asyncSenderThreadPoolQueue,
	new ThreadFactory() {
		private AtomicInteger threadIndex = new AtomicInteger(0);
		@Override
		public Thread newThread(Runnable r) {
			return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
		}
	});

消費端拉取消息線程池

我們重點來看一下消費端的線程池是如何創(chuàng)建,它可以說是整個RocketMQ中最關鍵的一個線程池

為了提高消費速度,我們通常有兩種方式來提高消費并行度

  • 同一個 ConsumerGroup 下,通過增加 Consumer 實例數(shù)量來提高并行度
  • 提高單個 Consumer 的消費并行線程,通過修改參數(shù) consumeThreadMin、consumeThreadMax實現(xiàn)。

如何創(chuàng)建?

在消息監(jiān)聽的時候,利用線程池進行不斷的拉取消息

提交消費請求,消息提交到內部的線程池

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

參數(shù)設置

創(chuàng)建內部線程池,核心參數(shù)核心線程數(shù)和最大線程數(shù),主要是根據(jù)配置來進行設置

設置線程池名稱以 ConsumeMessageThread_ 開頭的,利于排查問題

阻塞隊列是一個無界的阻塞隊列LinkedBlockingQueue

private final BlockingQueue<Runnable> consumeRequestQueue;
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(),
    this.defaultMQPushConsumer.getConsumeThreadMax(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.consumeRequestQueue,
    new ThreadFactoryImpl(consumeThreadPrefix));

通過RocketMQ的源碼,我們看到 consumeExecutor 線程池的創(chuàng)建也是非常簡單的

如果想要修改線程池參數(shù),需要注意什么?

根據(jù)線程池的原理我們知道,只有阻塞隊列為滿的情況下,不會創(chuàng)建臨時線程

所以線程池內部持有的隊列為一個無界隊列,導致 consumeThreadMax 大于 consumeThreadMin,線程個數(shù)最大也只能 consumeThreadMin 個線程數(shù)量

什么時候需要修改?

在正常的業(yè)務場景中,啟動應用之后,我們就不會再修改消費者線程數(shù),但有可能突發(fā)業(yè)務高峰導致消息堆積,這時候我們就需要調整單個 Consumer 的消費并行線程數(shù)。

如何修改線程數(shù)?

  • 修改線程池后,重新啟動消費者,缺點是參數(shù)不易評估,隨著業(yè)務的并發(fā)提升,需要頻繁的重啟服務來更改線程數(shù),這勢必會帶來一定的造成影響。
  • 官方也為我們提供了修改線程數(shù)的方法,當更新的線程數(shù)大于0且小于 Short.MAX_VALUE 且小于最大線程數(shù),則更新核心線程數(shù)。

JDK允許線程池使用方通過ThreadPoolExecutor的實例來動態(tài)設置線程池的核心策略

@Override
public void updateCorePoolSize(int corePoolSize) {
    if (corePoolSize > 0
        && corePoolSize <= Short.MAX_VALUE
        && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
        this.consumeExecutor.setCorePoolSize(corePoolSize);
    }
}

這兩種方式都存在一定的痛點

  • 線程數(shù)量隨著業(yè)務的變動,需要修改代碼
  • 在springBoot和SpringCloud Stream下,對線程池參數(shù)變更不是很友好
  • 不能通過管理界面,直接動態(tài)修改線程池參數(shù)

針對上面的痛點問題,我們可以考慮封裝線程池動態(tài)參數(shù)調整,首先肯定原來代碼是毫無侵入性的,

同時通過管理頁面對不同消費者組的線程池進行管理自由的隨著業(yè)務波動進行平滑修改,降低線程池參數(shù)修改的成本。

以上就是RocketMQ線程池創(chuàng)建實現(xiàn)原理詳解的詳細內容,更多關于RocketMQ線程池創(chuàng)建的資料請關注腳本之家其它相關文章!

相關文章

  • Java實現(xiàn)最小高度樹

    Java實現(xiàn)最小高度樹

    本文主要介紹了Java實現(xiàn)最小高度樹,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-04-04
  • java方法重寫時需要注意的問題

    java方法重寫時需要注意的問題

    大家好,本篇文章主要講的是java方法重寫時需要注意的問題,感興趣的同學趕快來看一看吧,對你有幫助的話記得收藏一下,方便下次瀏覽
    2021-12-12
  • SpringMVC?HttpMessageConverter報文信息轉換器

    SpringMVC?HttpMessageConverter報文信息轉換器

    這篇文章主要為大家介紹了SpringMVC?HttpMessageConverter報文信息轉換器,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-05-05
  • 如何使用idea里面自帶的翻譯插件

    如何使用idea里面自帶的翻譯插件

    這篇文章主要介紹了idea里面自帶的翻譯插件,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-11-11
  • SpringSecurityOAuth2實現(xiàn)微信授權登錄

    SpringSecurityOAuth2實現(xiàn)微信授權登錄

    微信的登錄功能是用戶注冊和使用微信的必經(jīng)之路之一,而微信授權登錄更是方便了用戶的登錄操作,本文主要介紹了SpringSecurityOAuth2實現(xiàn)微信授權登錄,感興趣的可以了解一下
    2023-09-09
  • Java forEach對原數(shù)組的操作過程

    Java forEach對原數(shù)組的操作過程

    forEach對于基本數(shù)據(jù)類型,是直接賦值,對于引用數(shù)據(jù)類型,是引用地址值,forEach遍歷時,是創(chuàng)建的臨時變量,引用的數(shù)據(jù)地址,本文給大家介紹Java forEach對原數(shù)組的操作過程,感興趣的朋友一起看看吧
    2024-02-02
  • java實現(xiàn)的順時針/逆時針打印矩陣操作示例

    java實現(xiàn)的順時針/逆時針打印矩陣操作示例

    這篇文章主要介紹了java實現(xiàn)的順時針/逆時針打印矩陣操作,涉及java基于數(shù)組的矩陣存儲、遍歷、打印輸出等相關操作技巧,需要的朋友可以參考下
    2019-12-12
  • @CacheEvict中的allEntries與beforeInvocation的區(qū)別說明

    @CacheEvict中的allEntries與beforeInvocation的區(qū)別說明

    這篇文章主要介紹了@CacheEvict中的allEntries與beforeInvocation的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • java基礎之初始化ArrayList時直接賦值的4種方式總結

    java基礎之初始化ArrayList時直接賦值的4種方式總結

    ArrayList是Java中的一個類,它是Java集合框架中的一部分,用于實現(xiàn)動態(tài)數(shù)組,下面這篇文章主要給大家介紹了關于java基礎之初始化ArrayList時直接賦值的4種方式,文中通過代碼介紹的非常詳細,需要的朋友可以參考下
    2024-07-07
  • 解決java中的父類私有成員變量的繼承問題

    解決java中的父類私有成員變量的繼承問題

    這篇文章主要介紹了解決java中的父類私有成員變量的繼承問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01

最新評論