欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ線程池創(chuàng)建實(shí)現(xiàn)原理詳解

 更新時(shí)間:2022年12月15日 08:50:41   作者:小郭的技術(shù)筆記  
這篇文章主要為大家介紹了RocketMQ線程池創(chuàng)建實(shí)現(xiàn)原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前言

大家好,我是小郭,今天主要來和大家聊一聊RocketMQ中的線程池是如何創(chuàng)建的,如何設(shè)置線程池?cái)?shù)量,同時(shí)也可以從中去學(xué)習(xí)到一些線程池的實(shí)踐和需要注意的一些細(xì)節(jié)。

RocketMQ在哪些地方使用到了線程池?

在RocketMQ中存在了大量的對(duì)線程池的使用,從消息的生產(chǎn)到投遞Broker中,到最后的消息消費(fèi)每一個(gè)環(huán)節(jié)中都大量使用到線程池的地方,下面我們拿出幾個(gè)不同類型的線程池來看一看。

在 NameServer的路由注冊和剔除中,多次使用到了定時(shí)線程池

定時(shí)線程池

private final ScheduledExecutorService scheduledExecutorService =
	Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
		"NSScheduledThread"));
// 定時(shí)任務(wù) 每10s掃描一次Broker,移除失活Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
	?
	@Override
	public void run() {
		NamesrvController.this.routeInfoManager.scanNotActiveBroker();
	}
}, 5, 10, TimeUnit.SECONDS);
//定時(shí)任務(wù),每隔30s向集群中所有NameServer發(fā)送心跳包
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
	?
	@Override
	public void run() {
		try {
			BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
		} catch (Throwable e) {
			log.error("registerBrokerAll Exception", e);
		}
	}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

線程池newFixedThreadPool

FixedThreadPool常用于創(chuàng)建一個(gè)固定大小的線程池,

它的特點(diǎn)就是核心線程數(shù)量與最大線程數(shù)量一致,采用無界的阻塞隊(duì)列 LinkedBlockingQueue,并且沒有設(shè)置隊(duì)列的大小默認(rèn)是Integer.MAX_VALUE,適用于負(fù)載較重的場景

private ExecutorService remotingExecutor;
this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 用來設(shè)置接收到消息后的處理方法
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);

消息發(fā)送初始化默認(rèn)異步發(fā)送者線程池

核心線程數(shù)與最大線程數(shù)設(shè)置均為 Runtime.getRuntime().availableProcessors() ,可用的計(jì)算資源

阻塞隊(duì)列設(shè)置為一個(gè)初始化50000長度的阻塞隊(duì)列

keepAliveTime設(shè)置60s,超過則時(shí)間空閑的線程將被終止

private final ExecutorService defaultAsyncSenderExecutor;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
	Runtime.getRuntime().availableProcessors(),
	Runtime.getRuntime().availableProcessors(),
	1000 * 60,
	TimeUnit.MILLISECONDS,
	this.asyncSenderThreadPoolQueue,
	new ThreadFactory() {
		private AtomicInteger threadIndex = new AtomicInteger(0);
		@Override
		public Thread newThread(Runnable r) {
			return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
		}
	});

消費(fèi)端拉取消息線程池

我們重點(diǎn)來看一下消費(fèi)端的線程池是如何創(chuàng)建,它可以說是整個(gè)RocketMQ中最關(guān)鍵的一個(gè)線程池

為了提高消費(fèi)速度,我們通常有兩種方式來提高消費(fèi)并行度

  • 同一個(gè) ConsumerGroup 下,通過增加 Consumer 實(shí)例數(shù)量來提高并行度
  • 提高單個(gè) Consumer 的消費(fèi)并行線程,通過修改參數(shù) consumeThreadMin、consumeThreadMax實(shí)現(xiàn)。

如何創(chuàng)建?

在消息監(jiān)聽的時(shí)候,利用線程池進(jìn)行不斷的拉取消息

提交消費(fèi)請(qǐng)求,消息提交到內(nèi)部的線程池

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

參數(shù)設(shè)置

創(chuàng)建內(nèi)部線程池,核心參數(shù)核心線程數(shù)和最大線程數(shù),主要是根據(jù)配置來進(jìn)行設(shè)置

設(shè)置線程池名稱以 ConsumeMessageThread_ 開頭的,利于排查問題

阻塞隊(duì)列是一個(gè)無界的阻塞隊(duì)列LinkedBlockingQueue

private final BlockingQueue<Runnable> consumeRequestQueue;
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(),
    this.defaultMQPushConsumer.getConsumeThreadMax(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.consumeRequestQueue,
    new ThreadFactoryImpl(consumeThreadPrefix));

通過RocketMQ的源碼,我們看到 consumeExecutor 線程池的創(chuàng)建也是非常簡單的

如果想要修改線程池參數(shù),需要注意什么?

根據(jù)線程池的原理我們知道,只有阻塞隊(duì)列為滿的情況下,不會(huì)創(chuàng)建臨時(shí)線程

所以線程池內(nèi)部持有的隊(duì)列為一個(gè)無界隊(duì)列,導(dǎo)致 consumeThreadMax 大于 consumeThreadMin,線程個(gè)數(shù)最大也只能 consumeThreadMin 個(gè)線程數(shù)量

什么時(shí)候需要修改?

在正常的業(yè)務(wù)場景中,啟動(dòng)應(yīng)用之后,我們就不會(huì)再修改消費(fèi)者線程數(shù),但有可能突發(fā)業(yè)務(wù)高峰導(dǎo)致消息堆積,這時(shí)候我們就需要調(diào)整單個(gè) Consumer 的消費(fèi)并行線程數(shù)。

如何修改線程數(shù)?

  • 修改線程池后,重新啟動(dòng)消費(fèi)者,缺點(diǎn)是參數(shù)不易評(píng)估,隨著業(yè)務(wù)的并發(fā)提升,需要頻繁的重啟服務(wù)來更改線程數(shù),這勢必會(huì)帶來一定的造成影響。
  • 官方也為我們提供了修改線程數(shù)的方法,當(dāng)更新的線程數(shù)大于0且小于 Short.MAX_VALUE 且小于最大線程數(shù),則更新核心線程數(shù)。

JDK允許線程池使用方通過ThreadPoolExecutor的實(shí)例來動(dòng)態(tài)設(shè)置線程池的核心策略

@Override
public void updateCorePoolSize(int corePoolSize) {
    if (corePoolSize > 0
        && corePoolSize <= Short.MAX_VALUE
        && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
        this.consumeExecutor.setCorePoolSize(corePoolSize);
    }
}

這兩種方式都存在一定的痛點(diǎn)

  • 線程數(shù)量隨著業(yè)務(wù)的變動(dòng),需要修改代碼
  • 在springBoot和SpringCloud Stream下,對(duì)線程池參數(shù)變更不是很友好
  • 不能通過管理界面,直接動(dòng)態(tài)修改線程池參數(shù)

針對(duì)上面的痛點(diǎn)問題,我們可以考慮封裝線程池動(dòng)態(tài)參數(shù)調(diào)整,首先肯定原來代碼是毫無侵入性的,

同時(shí)通過管理頁面對(duì)不同消費(fèi)者組的線程池進(jìn)行管理自由的隨著業(yè)務(wù)波動(dòng)進(jìn)行平滑修改,降低線程池參數(shù)修改的成本。

以上就是RocketMQ線程池創(chuàng)建實(shí)現(xiàn)原理詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ線程池創(chuàng)建的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java實(shí)現(xiàn)最小高度樹

    Java實(shí)現(xiàn)最小高度樹

    本文主要介紹了Java實(shí)現(xiàn)最小高度樹,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-04-04
  • java方法重寫時(shí)需要注意的問題

    java方法重寫時(shí)需要注意的問題

    大家好,本篇文章主要講的是java方法重寫時(shí)需要注意的問題,感興趣的同學(xué)趕快來看一看吧,對(duì)你有幫助的話記得收藏一下,方便下次瀏覽
    2021-12-12
  • SpringMVC?HttpMessageConverter報(bào)文信息轉(zhuǎn)換器

    SpringMVC?HttpMessageConverter報(bào)文信息轉(zhuǎn)換器

    這篇文章主要為大家介紹了SpringMVC?HttpMessageConverter報(bào)文信息轉(zhuǎn)換器,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • 如何使用idea里面自帶的翻譯插件

    如何使用idea里面自帶的翻譯插件

    這篇文章主要介紹了idea里面自帶的翻譯插件,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-11-11
  • SpringSecurityOAuth2實(shí)現(xiàn)微信授權(quán)登錄

    SpringSecurityOAuth2實(shí)現(xiàn)微信授權(quán)登錄

    微信的登錄功能是用戶注冊和使用微信的必經(jīng)之路之一,而微信授權(quán)登錄更是方便了用戶的登錄操作,本文主要介紹了SpringSecurityOAuth2實(shí)現(xiàn)微信授權(quán)登錄,感興趣的可以了解一下
    2023-09-09
  • Java forEach對(duì)原數(shù)組的操作過程

    Java forEach對(duì)原數(shù)組的操作過程

    forEach對(duì)于基本數(shù)據(jù)類型,是直接賦值,對(duì)于引用數(shù)據(jù)類型,是引用地址值,forEach遍歷時(shí),是創(chuàng)建的臨時(shí)變量,引用的數(shù)據(jù)地址,本文給大家介紹Java forEach對(duì)原數(shù)組的操作過程,感興趣的朋友一起看看吧
    2024-02-02
  • java實(shí)現(xiàn)的順時(shí)針/逆時(shí)針打印矩陣操作示例

    java實(shí)現(xiàn)的順時(shí)針/逆時(shí)針打印矩陣操作示例

    這篇文章主要介紹了java實(shí)現(xiàn)的順時(shí)針/逆時(shí)針打印矩陣操作,涉及java基于數(shù)組的矩陣存儲(chǔ)、遍歷、打印輸出等相關(guān)操作技巧,需要的朋友可以參考下
    2019-12-12
  • @CacheEvict中的allEntries與beforeInvocation的區(qū)別說明

    @CacheEvict中的allEntries與beforeInvocation的區(qū)別說明

    這篇文章主要介紹了@CacheEvict中的allEntries與beforeInvocation的區(qū)別說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • java基礎(chǔ)之初始化ArrayList時(shí)直接賦值的4種方式總結(jié)

    java基礎(chǔ)之初始化ArrayList時(shí)直接賦值的4種方式總結(jié)

    ArrayList是Java中的一個(gè)類,它是Java集合框架中的一部分,用于實(shí)現(xiàn)動(dòng)態(tài)數(shù)組,下面這篇文章主要給大家介紹了關(guān)于java基礎(chǔ)之初始化ArrayList時(shí)直接賦值的4種方式,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2024-07-07
  • 解決java中的父類私有成員變量的繼承問題

    解決java中的父類私有成員變量的繼承問題

    這篇文章主要介紹了解決java中的父類私有成員變量的繼承問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01

最新評(píng)論