欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中RocketMQ的流量削峰詳解

 更新時(shí)間:2023年09月15日 09:09:13   作者:「已注銷」  
這篇文章主要介紹了Java中RocketMQ的流量削峰詳解,MQ的主要特點(diǎn)為解耦、異步、削峰,該文章主要記錄與分享個(gè)人在實(shí)際項(xiàng)目中的RocketMQ削峰用法,用于減少數(shù)據(jù)庫壓力的業(yè)務(wù)場景,需要的朋友可以參考下

前言

MQ的主要特點(diǎn)為解耦、異步、削峰,該文章主要記錄與分享個(gè)人在實(shí)際項(xiàng)目中的RocketMQ削峰用法,用于減少數(shù)據(jù)庫壓力的業(yè)務(wù)場景,其中RocketMQ的核心組件概念如下:

  • Producer:生產(chǎn)發(fā)送消息
  • Broker:存儲(chǔ)Producer發(fā)送過來的消息
  • Consumer:從Broker拉取消息并進(jìn)行消費(fèi)
  • NameServer:為Producer或Consumer路由到Broker

在這里插入圖片描述

其中消費(fèi)流程有以下幾點(diǎn)是必須注意的:

  • RocketMQ的Consumer獲取消息是通過向Broker發(fā)送拉取請(qǐng)求獲取的,而不是由Broker發(fā)送Consumer接收的方式。
  • Consumer每次拉取消息時(shí)消息都會(huì)被均勻分發(fā)到消息隊(duì)列再進(jìn)行傳輸,所以RocketMQ中的很多參數(shù)都是針對(duì)隊(duì)列而不是Topic的(這個(gè)是重點(diǎn),順便吐槽下源碼的文檔講的真不清晰,很多都需要自己試錯(cuò),但Dashboard做得很好),其中每個(gè)Broker消息隊(duì)列(ConsumeQueue)的數(shù)量都可以通過RocketMQ DashBoard實(shí)時(shí)更改調(diào)整。

rocketmq-spring-boot-starter 用法簡介

當(dāng)開發(fā)中需要快速集成RocketMQ時(shí)可以考慮使用 rocketmq-spring-boot-starter 搭建RocketMQ的集成環(huán)境,但該框架并不完全具備RocketMQ所有的配置簡化,如需批量消費(fèi)消息便需要自定義一個(gè)DefaultMQPushConsumer bean去消費(fèi)了。 個(gè)人在開發(fā)中常用的rocketmq-spring-boot-starter相關(guān)類:

  • RocketMQListener接口:消費(fèi)者都需實(shí)現(xiàn)該接口的消費(fèi)方法onMessage(msg)。
  • RocketMQPushConsumerLifecycleListener接口:當(dāng)@RocketMQMessageListener中的配置不足以滿足我們的需求時(shí),可以實(shí)現(xiàn)該接口直接更改消費(fèi)者類DefaultMQPushConsumer配置
  • @RocketMQMessageListener:被該注解標(biāo)注并實(shí)現(xiàn)了接口RocketMQListener的bean為一個(gè)消費(fèi)者并監(jiān)聽指定topic隊(duì)列中的消息,該注解中包含消費(fèi)者的一些常用配置(大部分按默認(rèn)即可),一般只需更改consumerGroup(消費(fèi)組)與topic。
    RocketMQMessageListener中的屬性配置是可以使用Placeholder(占位符)從配置文件或配置中心獲取的,如下圖:

在這里插入圖片描述

業(yè)務(wù)案例

有一個(gè)點(diǎn)贊業(yè)務(wù),不限制用戶的點(diǎn)贊數(shù)只需進(jìn)行記錄(產(chǎn)品需求,開發(fā)提議無效),當(dāng)每個(gè)用戶都進(jìn)行x連擊享受數(shù)量猛增的快感時(shí)如果數(shù)據(jù)庫都需要進(jìn)行x個(gè)點(diǎn)贊數(shù)據(jù)的插入,數(shù)據(jù)庫毫無疑問會(huì)塞死導(dǎo)致崩潰。

于是想到可以嘗試下MQ削峰,比如每秒來了5000消息但數(shù)據(jù)庫只能承受2000,那我消費(fèi)時(shí)每次只拉取消費(fèi)1600就好了,剩下的放在Broker堆積慢慢消費(fèi)就好。由于之前的消息中心也在用RocketMQ,于是確認(rèn)使用RocketMQ來進(jìn)行削峰。

在這里插入圖片描述

環(huán)境配置

文章例子環(huán)境:1NameServer + 2Broker + 1Consumer

添加maven依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

application.yml配置

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: praise-group
server:
  port: 10000
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: tiger
    url: jdbc:mysql://localhost:3306/wilson
swagger:
  docket:
    base-package: io.rocket.consumer.controller

點(diǎn)贊接口

PraiseRecord(點(diǎn)贊記錄):

@Data
public class PraiseRecord implements Serializable {
    private Long id;
    private Long uid;
    private Long liveId;
    private LocalDateTime createTime;
}

MessageController(簡單的測試接口):

RestController
@RequestMapping("/message")
public class MessageController {
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @PostMapping("/praise")
    public ServerResponse praise(@RequestBody PraiseRecordVO vo) {
        rocketMQTemplate.sendOneWay(RocketConstant.Topic.PRAISE_TOPIC, MessageBuilder.withPayload(vo).build());
        return ServerResponse.success();
    }
    // ......
}

由于用戶可以連續(xù)點(diǎn)贊,所以考慮可以在點(diǎn)贊消息的處理上寬松一點(diǎn)(容許消息丟失)以追求更高的性能,因此選擇使用sendOneyWay()進(jìn)行消息發(fā)送。Java知音公眾號(hào)內(nèi)回復(fù)“面試題聚合”,送你一份面試題寶典

RocketMQ的消息發(fā)送方式主要含syncSend()同步發(fā)送、asyncSend()異步發(fā)送、sendOneWay()三種方式,sendOneWay()也是異步發(fā)送,區(qū)別在于不需等待Broker返回確認(rèn),所以可能會(huì)存在信息丟失的狀況,但吞吐量更高,具體需根據(jù)業(yè)務(wù)情況選用。

性能:sendOneWay > asyncSend > syncSend

RocketMQTemplate的send()方法默認(rèn)是同步(syncSend)的,更多可看源碼實(shí)現(xiàn)。

PraiseListener:點(diǎn)贊消息消費(fèi)者

@Service
@RocketMQMessageListener(topic = RocketConstant.Topic.PRAISE_TOPIC, consumerGroup = RocketConstant.ConsumerGroup.PRAISE_CONSUMER)
@Slf4j
public class PraiseListener implements RocketMQListener<PraiseRecordVO>, RocketMQPushConsumerLifecycleListener {
    @Resource
    private PraiseRecordService praiseRecordService;
    @Override
    public void onMessage(PraiseRecordVO vo) {
        praiseRecordService.insert(vo.copyProperties(PraiseRecord::new));
    }
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        // 每次拉取的間隔,單位為毫秒
        consumer.setPullInterval(2000);
        // 設(shè)置每次從隊(duì)列中拉取的消息數(shù)為16
        consumer.setPullBatchSize(16);
    }
}

單次pull消息的最大數(shù)目受broker存儲(chǔ)的MessageStoreConfig.maxTransferCountOnMessageInMemory(默認(rèn)為32)值限制,即若想要消費(fèi)者從隊(duì)列拉取的消息數(shù)大于32有效(pullBatchSize>32)則需更改Broker的啟動(dòng)參數(shù)maxTransferCountOnMessageInMemory值。

在MQ削峰的配置參數(shù)里,以下幾個(gè)DefaultMQPushConsumer的參數(shù)是需要注意一下的:

  • pullInterval:每次從Broker拉取消息的間隔,單位為毫秒
  • pullBatchSize:每次從Broker隊(duì)列拉取到的消息數(shù),該參數(shù)很容易讓人誤解,一開始我以為是每次拉取的消息總數(shù),但測試過幾次后確認(rèn)了實(shí)質(zhì)上是從每個(gè)隊(duì)列的拉取數(shù)(源碼上的注釋文檔真的很差,跟沒有一樣),即Consume每次拉取的消息總數(shù)如下:
    EachPullTotal=所有Broker上的寫隊(duì)列數(shù)和(writeQueueNums=readQueueNums) * pullBatchSize
  • consumeMessageBatchMaxSize:每次消費(fèi)(即將多條消息合并為List消費(fèi))的最大消息數(shù)目,默認(rèn)值為1,rocketmq-spring-boot-starter 目前不支持批量消費(fèi)(2.1.0版本)
    在消費(fèi)者開始消息消費(fèi)時(shí)會(huì)先從各隊(duì)列中拉取一條消息進(jìn)行消費(fèi),消費(fèi)成功后再以每次pullBatchSize的數(shù)目進(jìn)行拉取。

PraiseListener中設(shè)置了每次拉取的間隔為2s,每次從隊(duì)列拉取的消息數(shù)為16,在搭建了2master broker且broker上writeQueueNums=readQueueNums=4的環(huán)境下每次拉取的消息理論數(shù)值為16 * 2 * 4 = 128,在第一次從各隊(duì)列拉取1條消息(即共8條)后消費(fèi)成功后會(huì)每次就會(huì)拉取最多128條消息進(jìn)行消費(fèi),想驗(yàn)證下的可以把onMessage()的insert()改為log.info(“1”)然后統(tǒng)計(jì)單位秒內(nèi)打印的日志數(shù)是否為128。

在這里插入圖片描述

根據(jù)以上配置單Conumer情況下每2s理論消費(fèi)為128,即每2秒數(shù)據(jù)庫新增的點(diǎn)贊數(shù)據(jù)大概為128條左右,有20%偏差都在個(gè)人可接受范圍內(nèi),然后對(duì)點(diǎn)贊接口進(jìn)行簡單壓測1s 2000請(qǐng)求校驗(yàn)MQ效果,根據(jù)消費(fèi)配置理論上需要16次拉取即需32s才能消費(fèi)完,壓測后查看數(shù)據(jù)庫校驗(yàn)效果:

在這里插入圖片描述

在這里插入圖片描述

由上圖可以看出除第一次2s和最后一次2s外數(shù)據(jù)庫每2s的插入數(shù)據(jù)數(shù)和一般都在128附近波動(dòng),也用了34s(因第一次拉取數(shù)較少所以比理論多花費(fèi)一次拉取)消費(fèi)的偏差大小可能會(huì)受每次拉取數(shù)pullBatchSize、Broker上的消息隊(duì)列數(shù)、網(wǎng)絡(luò)波動(dòng)等情況影響,但需要的目的已經(jīng)達(dá)到了。

我只想把單位時(shí)間內(nèi)過多的數(shù)據(jù)庫操作交給MQ做分隔成多個(gè)單位時(shí)間內(nèi)的小批量操作,消息過多就堆積,當(dāng)請(qǐng)求峰值過了后直到MQ堆積的消息消費(fèi)完前數(shù)據(jù)庫的插入數(shù)依舊會(huì)與峰值期的插入數(shù)相差不大,達(dá)到了MQ削峰填谷的效果。Java知音公眾號(hào)內(nèi)回復(fù)“面試題聚合”,送你一份面試題寶典

上線了但消費(fèi)效率預(yù)估失誤如何動(dòng)態(tài)更改消費(fèi)效率 ?

當(dāng)把拉取數(shù)pullBatchSize設(shè)置Broker的默認(rèn)最大傳輸值32了,線上又不想重啟Broker更改maxTransferCountOnMessageInMemory參數(shù),如有2個(gè)Broker且queue都為4,那么拉取消費(fèi)效率才為32 * 2 * 4 = 256,如果想要?jiǎng)討B(tài)調(diào)整,可以從Broker數(shù)或Broker隊(duì)列數(shù)下手,可以將Broker的writeQueueNums、readQueueNums增大,如都改為8,那么效率就成了32 * 2 * 8 = 512。

需要注意的是更改完queues后必須去Dashboard的Topic下的CONSUMER MANAGER查看新增的隊(duì)列上是否都有Consumer成功注冊(cè)上去了,因?yàn)橛龅搅嗽跍y試與生產(chǎn)上使用rocketmq-spring-boot-starter @RocketMQListener標(biāo)注消費(fèi)者不會(huì)自動(dòng)注冊(cè)到新隊(duì)列上的情況,但沒排除是不是RocketMQ版本的原因(個(gè)人本地的版本比環(huán)境上的高了一個(gè)小版本0.0.1,本地沒出現(xiàn)沒消費(fèi)者注冊(cè)到新隊(duì)列上的問題),而是使用了自定義DefaultMQPushConsumer bean(原生的方式都是沒有問題的)的備用方案。

當(dāng)再啟動(dòng)新的消費(fèi)者應(yīng)用時(shí)CONSUMER MANAGER(下圖)中就會(huì)出現(xiàn) 新Consumer數(shù) * 各Broker隊(duì)列數(shù)和的隊(duì)列行。

在這里插入圖片描述

如何使用RocketMQ批量消費(fèi) ?

雖然點(diǎn)贊業(yè)務(wù)使用MQ單條插入后TPS已經(jīng)達(dá)到當(dāng)前業(yè)務(wù)指標(biāo)要求了,但考慮到如果后續(xù)要求在不添加機(jī)器數(shù)的情況下增加TPS,且數(shù)據(jù)量還沒到分庫分表的程度,個(gè)人就打算從批量消費(fèi)下手,由一次插入一條點(diǎn)贊記錄改為一次性插入多條(insertBatch)。

當(dāng)然能滿足現(xiàn)有需求能不做肯定不做的,過度優(yōu)化過分礙事,但想多點(diǎn)方案不會(huì)壞事。rocketmq-spring-boot-starter并沒有提供批量消費(fèi)的功能,所以要批量消費(fèi)消息需要自定義DefaultMQPushConsumer并配置其consumeMessageBatchMaxSize屬性。

consumeMessageBatchMaxSize屬性默認(rèn)值為1,即每次只消費(fèi)一條消息,需要注意的是該屬性也會(huì)受pullBatchSize影響,如果consumeMessageBatchMaxSize為32但pullBatchSize只為12,那么每次批量消費(fèi)的最大消息數(shù)也就只有12。

如下為個(gè)人測試批量消費(fèi)Consumer的測試bean:

@Bean
public DefaultMQPushConsumer userMQPushConsumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ConsumerGroup.SPRING_BOOT_USER_CONSUMER);
    consumer.setNamesrvAddr(nameServer);
    consumer.subscribe(RocketConstant.Topic.SPRING_BOOT_USER_TOPIC, "*");
    // 設(shè)置每次消息拉取的時(shí)間間隔,單位毫秒
    consumer.setPullInterval(1000);
    // 設(shè)置每個(gè)隊(duì)列每次拉取的最大消息數(shù)
    consumer.setPullBatchSize(24);
    // 設(shè)置消費(fèi)者單次批量消費(fèi)的消息數(shù)目上限
    consumer.setConsumeMessageBatchMaxSize(12);
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)
            -> {
        List<UserInfo> userInfos = new ArrayList<>(msgs.size());
        Map<Integer, Integer> queueMsgMap = new HashMap<>(8);
        msgs.forEach(msg -> {
            userInfos.add(JSONObject.parseObject(msg.getBody(), UserInfo.class));
            queueMsgMap.compute(msg.getQueueId(), (key, val) -> val == null ? 1 : ++val);
        });
        log.info("userInfo size: {}, content: {}", userInfos.size(), userInfos);
        /*
          處理批量消息,如批量插入:userInfoMapper.insertBatch(userInfos);
         */
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
    return consumer;
}

如果默認(rèn)配置情況下log打印出的userInfo size恒為1,但由于設(shè)置了consumeMessageBatchMaxSize與pullBatchSize,且pullBatchSize較小,所以每次消費(fèi)的消息數(shù)最大值為12,如下圖:

在這里插入圖片描述

附本文相關(guān)信息

確保mqnamesrv與mqbroker已啟動(dòng)成功,如該文章環(huán)境的啟動(dòng):

mqnamesrv -n 127.0.0.1:9876
mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-a.properties
mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-b.properties

到此這篇關(guān)于Java中RocketMQ的流量削峰詳解的文章就介紹到這了,更多相關(guān)RocketMQ的流量削峰內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 如何使用java制作假數(shù)據(jù)接口

    如何使用java制作假數(shù)據(jù)接口

    這篇文章主要介紹了如何使用java制作假數(shù)據(jù)接口,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • 基于JPA的Repository使用詳解

    基于JPA的Repository使用詳解

    這篇文章主要介紹了JPA的Repository使用詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • eclipse報(bào)錯(cuò) eclipse啟動(dòng)報(bào)錯(cuò)解決方法

    eclipse報(bào)錯(cuò) eclipse啟動(dòng)報(bào)錯(cuò)解決方法

    本文將介紹eclipse啟動(dòng)報(bào)錯(cuò)解決方法,需要了解的朋友可以參考下
    2012-11-11
  • 詳解Java如何有效避免空指針

    詳解Java如何有效避免空指針

    空指針,也就是NullPointerException 簡稱NPE的,怕一下子寫出NPE,部分的伙伴看不懂,索性就改成了空指針,在實(shí)際的開發(fā)中,我們最討厭的就是遇到空指針了,業(yè)務(wù)跑著跑著發(fā)現(xiàn)了空指針,所以本文詳細(xì)介紹了Java如何有效的避免空指針,需要的朋友可以參考下
    2023-12-12
  • 微信跳一跳輔助Java代碼實(shí)現(xiàn)

    微信跳一跳輔助Java代碼實(shí)現(xiàn)

    這篇文章主要為大家詳細(xì)介紹了微信跳一跳輔助的Java代碼實(shí)現(xiàn)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-01-01
  • Java多線程之悲觀鎖與樂觀鎖

    Java多線程之悲觀鎖與樂觀鎖

    這篇文章主要為大家詳細(xì)介紹了Java悲觀鎖與樂觀鎖,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-03-03
  • Java的枚舉enum示例詳解

    Java的枚舉enum示例詳解

    這篇文章主要給大家介紹了關(guān)于Java的枚舉enum的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-08-08
  • SpringBoot入門之集成JSP的示例代碼

    SpringBoot入門之集成JSP的示例代碼

    這篇文章主要介紹了SpringBoot入門之集成JSP的示例代碼,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-07-07
  • JavaWeb實(shí)現(xiàn)學(xué)生管理系統(tǒng)的超詳細(xì)過程

    JavaWeb實(shí)現(xiàn)學(xué)生管理系統(tǒng)的超詳細(xì)過程

    學(xué)生信息管理系統(tǒng)是針對(duì)學(xué)校人事處的大量業(yè)務(wù)處理工作而開發(fā)的管理軟件,主要用于學(xué)校學(xué)生信息管理,下面這篇文章主要給大家介紹了關(guān)于JavaWeb實(shí)現(xiàn)學(xué)生管理系統(tǒng)的超詳細(xì)過程,需要的朋友可以參考下
    2023-05-05
  • idea Gradle 控制臺(tái)中文亂碼的解決

    idea Gradle 控制臺(tái)中文亂碼的解決

    通過IDEA執(zhí)行g(shù)radle的任務(wù)時(shí),在終端的輸出出現(xiàn)中文亂碼,本文主要介紹了idea Gradle 控制臺(tái)中文亂碼的解決,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-03-03

最新評(píng)論