欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

在SpringBoot中利用RocketMQ實(shí)現(xiàn)批量消息消費(fèi)功能

 更新時(shí)間:2024年11月07日 11:53:31   作者:魔道不誤砍柴功  
RocketMQ 是一款分布式消息隊(duì)列,支持高吞吐、低延遲的消息傳遞,對(duì)于需要一次處理多條消息的場(chǎng)景,RocketMQ 提供了批量消費(fèi)的機(jī)制,這篇文章將展示如何在 Spring Boot 中實(shí)現(xiàn)這一功能,感興趣的小伙伴跟著小編一起來看看吧

準(zhǔn)備工作

在開始之前,請(qǐng)確保你已經(jīng)安裝和配置好 RocketMQ。如果還沒安裝,請(qǐng)參考 RocketMQ 官網(wǎng) 獲取安裝指南。

項(xiàng)目依賴

首先,我們需要在 Spring Boot 項(xiàng)目中添加 RocketMQ 的依賴。打開 pom.xml 文件,添加以下內(nèi)容:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>

這個(gè)依賴包包含了與 RocketMQ 集成所需的所有內(nèi)容。

配置 RocketMQ

在 application.yml 文件中添加 RocketMQ 的相關(guān)配置:

rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: batchConsumerGroup
  producer:
    group: batchProducerGroup
  • name-server:RocketMQ 服務(wù)的地址
  • consumer.group:消息消費(fèi)的分組
  • producer.group:消息生產(chǎn)的分組

確保 name-server 地址是正確的,指向你的 RocketMQ 服務(wù)。

生產(chǎn)批量消息

創(chuàng)建一個(gè)消息生產(chǎn)者,用于發(fā)送批量消息。以下是 BatchProducer.java 的示例代碼:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
public class BatchProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendBatchMessages() {
        List<Message<String>> messages = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message<String> message = MessageBuilder.withPayload("Hello RocketMQ " + i).build();
            messages.add(message);
        }
        
        rocketMQTemplate.syncSend("BatchTopic", messages, 10000);
        System.out.println("批量消息發(fā)送成功!");
    }
}
  • 這里,我們創(chuàng)建了 10 條消息并將它們添加到列表 messages 中。
  • 調(diào)用 rocketMQTemplate.syncSend 方法將消息批量發(fā)送到主題 BatchTopic。

消費(fèi)批量消息

接下來,我們創(chuàng)建一個(gè)消息消費(fèi)者,用于批量消費(fèi)消息。以下是 BatchConsumer.java 的示例代碼:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
@RocketMQMessageListener(topic = "BatchTopic", consumerGroup = "batchConsumerGroup", selectorExpression = "*", consumeMessageBatchMaxSize = 10)
public class BatchConsumer implements RocketMQListener<List<String>> {

    @Override
    public void onMessage(List<String> messages) {
        System.out.println("批量接收到消息:");
        messages.forEach(message -> System.out.println("消息內(nèi)容:" + message));
    }
}

在這段代碼中:

  • @RocketMQMessageListener 注解用于標(biāo)識(shí)這是一個(gè) RocketMQ 的消息監(jiān)聽器,指定了監(jiān)聽的主題 BatchTopic 和消費(fèi)分組 batchConsumerGroup。
  • consumeMessageBatchMaxSize = 10 表示每次批量消費(fèi)最多 10 條消息。
  • onMessage 方法會(huì)處理接收到的消息列表,并逐條打印出消息內(nèi)容。

測(cè)試批量消息發(fā)送和消費(fèi)

創(chuàng)建一個(gè)簡(jiǎn)單的 Spring Boot 控制器,用于觸發(fā)批量消息發(fā)送。以下是 MessageController.java 的代碼:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private BatchProducer batchProducer;

    @GetMapping("/sendBatchMessages")
    public String sendBatchMessages() {
        batchProducer.sendBatchMessages();
        return "批量消息已發(fā)送";
    }
}

通過訪問 http://localhost:8080/sendBatchMessages 觸發(fā)消息發(fā)送。

  • 調(diào)用這個(gè)接口會(huì)將批量消息發(fā)送到 RocketMQ 主題 BatchTopic。
  • BatchConsumer 會(huì)自動(dòng)接收并批量處理這些消息。

總結(jié)

我們成功在 Spring Boot 中實(shí)現(xiàn)了 RocketMQ 的批量消息發(fā)送與消費(fèi):

  • 使用 BatchProducer 類批量發(fā)送消息。
  • 使用 BatchConsumer 類批量消費(fèi)消息,并設(shè)置最大批量大小。
  • 通過簡(jiǎn)單的 REST API 控制消息發(fā)送,確保一切順利。

批量消息處理可以提高消息傳遞的效率,適合高并發(fā)場(chǎng)景。這種方式可以減少網(wǎng)絡(luò)開銷,并有效利用系統(tǒng)資源。

到此這篇關(guān)于在SpringBoot中利用RocketMQ實(shí)現(xiàn)批量消息消費(fèi)功能的文章就介紹到這了,更多相關(guān)SpringBoot RocketMQ消息消費(fèi)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring Boot 和 Spring 到底有啥區(qū)別你知道嗎

    Spring Boot 和 Spring 到底有啥區(qū)別你知道嗎

    Spring Boot框架的核心就是自動(dòng)配置,只要存在相應(yīng)的jar包,Spring就幫我們自動(dòng)配置。接下來通過本文給大家介紹Spring與Spring boot的區(qū)別介紹,非常不錯(cuò),需要的朋友參考下吧
    2021-08-08
  • 電腦上安裝多個(gè)JDK版本時(shí)該如何自由切換(詳細(xì)圖文)

    電腦上安裝多個(gè)JDK版本時(shí)該如何自由切換(詳細(xì)圖文)

    我們?cè)趯W(xué)習(xí)的過程中經(jīng)常用到不同的jdk版本,那么如何在一臺(tái)電腦上同時(shí)安裝多個(gè)jdk版本并進(jìn)行切換呢,這篇文章主要給大家介紹了關(guān)于電腦上安裝多個(gè)JDK版本時(shí)該如何自由切換的相關(guān)資料,需要的朋友可以參考下
    2023-10-10
  • java中使用sax解析xml的解決方法

    java中使用sax解析xml的解決方法

    本篇文章介紹了,在java中使用sax解析xml的解決方法。需要的朋友參考下
    2013-05-05
  • Java中instanceof的基本語法與用法詳解

    Java中instanceof的基本語法與用法詳解

    這篇文章主要介紹了Java中instanceof的基本語法與用法的相關(guān)資料,instanceof是Java中用于檢查對(duì)象是否是某個(gè)類或接口的實(shí)例的二元運(yùn)算符,需要的朋友可以參考下
    2025-03-03
  • JAVA通過Filter實(shí)現(xiàn)允許服務(wù)跨域請(qǐng)求的方法

    JAVA通過Filter實(shí)現(xiàn)允許服務(wù)跨域請(qǐng)求的方法

    這里的域指的是這樣的一個(gè)概念:我們認(rèn)為若協(xié)議 + 域名 + 端口號(hào)均相同,那么就是同域即我們常說的瀏覽器請(qǐng)求的同源策略。這篇文章主要介紹了JAVA通過Filter實(shí)現(xiàn)允許服務(wù)跨域請(qǐng)求,需要的朋友可以參考下
    2018-11-11
  • vue?vxe-table?實(shí)現(xiàn)財(cái)務(wù)記賬憑證的方案

    vue?vxe-table?實(shí)現(xiàn)財(cái)務(wù)記賬憑證的方案

    使用?vxe-table?實(shí)現(xiàn)財(cái)務(wù)記賬憑證非常簡(jiǎn)單,實(shí)現(xiàn)在線實(shí)時(shí)編輯的記賬憑證、自動(dòng)合計(jì)金額等,這篇文章主要介紹了vue?vxe-table?實(shí)現(xiàn)財(cái)務(wù)記賬憑證的方案,需要的朋友可以參考下
    2024-12-12
  • SpringBoot統(tǒng)一功能處理實(shí)現(xiàn)的全過程

    SpringBoot統(tǒng)一功能處理實(shí)現(xiàn)的全過程

    最近在做項(xiàng)目時(shí)需要對(duì)異常進(jìn)行全局統(tǒng)一處理,主要是一些分類入庫以及記錄日志等,下面這篇文章主要給大家介紹了關(guān)于SpringBoot統(tǒng)一功能處理實(shí)現(xiàn)的相關(guān)資料,文中通過圖文以及實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-01-01
  • Mybatis攔截器實(shí)現(xiàn)自定義需求

    Mybatis攔截器實(shí)現(xiàn)自定義需求

    本文主要介紹了Mybatis攔截器實(shí)現(xiàn)自定義需求,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-05-05
  • Spring中的@Cacheable緩存注解詳解

    Spring中的@Cacheable緩存注解詳解

    這篇文章主要介紹了Spring中的@Cacheable緩存注解詳解,數(shù)據(jù)庫查找的流程是先要從磁盤拿到數(shù)據(jù),再刷新到內(nèi)存,再返回?cái)?shù)據(jù)。磁盤相比于內(nèi)存來說,速度是很慢的,為了提升性能,就出現(xiàn)了基于內(nèi)存的緩存,需要的朋友可以參考下
    2023-05-05
  • java反編譯工具jd-gui-osx?for?mac?M1芯片無法使用的問題及解決

    java反編譯工具jd-gui-osx?for?mac?M1芯片無法使用的問題及解決

    這篇文章主要介紹了java反編譯工具jd-gui-osx?for?mac?M1芯片無法使用的問題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-01-01

最新評(píng)論