欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SprigBoot整合rocketmq-v5-client-spring-boot的示例詳解

 更新時間:2025年09月18日 17:26:34   作者:弄個昵稱  
這篇文章主要介紹了SprigBoot整合rocketmq-v5-client-spring-boot的詳細過程,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧
  1. 安裝RocketMQ 服務端
    Apache RocketMQ官方網站
    5.X文檔地址
    rocketmq-v5-client-spring-boot官方示例
    rocketmq-v5-client-spring-boot-starter maven倉庫
    RocketMQ服務端 的安裝包分為兩種,二進制包和源碼包。這里以5.3.2版本做示例 。 點擊這里 下載 Apache RocketMQ 5.3.2的源碼包。你也可以從這里 下載到二進制包。二進制包是已經編譯完成后可以直接運行的,源碼包是需要編譯后運行的。

本文整合的rocketmq-v5-client-spring-boot 版本2.3.3 , 內部引用的是rocketmq-client-java , 版本5.0.7,使用的是gRPC 協(xié)議 , 使用前建議先把官方文檔與示例看一下,使用的Java環(huán)境是openjdk-17

建議使用jdk11以上版本

ubuntu安裝RocketMQ

查看jdk版本

java -version

如果沒安裝jdk的話,在root賬號下執(zhí)行以下命令

apt-get upgrade
apt-get update
apt install openjdk-17-jdk

創(chuàng)建文件夾

cd /usr/local
mkdir rocketmq
cd rocketmq

下載RocketMQ二進制包(我這里使用的是5.3.2版本,使用其他版本可前往官方下載)

wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.2/rocketmq-all-5.3.2-bin-release.zip

解壓 沒有unzip的解壓命令 , ubuntu會提示,根據提示安裝unzip插件

unzip rocketmq-all-5.3.2-bin-release.zip

進入bin目錄
調整runserver.sh的內存大小

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

調整runbroker.sh內存大小

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"

修改/conf/broker.conf配置文件,這里主要是為了測試方便我們放開自動創(chuàng)建Topic的配置,加入以下配置(經過測試5.0以上版本不支持自動創(chuàng)建主題topic)

# 開啟自動創(chuàng)建 Topic  加不加都行
autoCreateTopicEnable=true
#內網ip  namesrvAddr:nameSrv地址 公網訪問設置公網IP  內網訪問設置內網IP   以下所有IP需一致
namesrvAddr=192.168.3.86:9876   
#brokerIP1:broker也需要一個ip  內網或公網
brokerIP1=192.168.3.86

配置 NameServer 的環(huán)境變量

配置環(huán)境

 vim /etc/profile

添加以下配置

#MQ安裝位置
export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-5.3.2
#MQ公網或內網ip  公網訪問設置公網IP  內網訪問設置內網IP
export NAMESRV_ADDR=192.168.3.86:9876

重新編譯文件生效

source /etc/profile

修改完后,我們就可以啟動 RocketMQ 的 NameServer 了

啟動 namesrv

nohup sh bin/mqnamesrv &

驗證

# 驗證 namesrv 是否啟動成功
tail -f -n 500 mqnamesrv.log
...
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
# 或者是
tail -f ~/logs/rocketmqlogs/namesrv.log

啟動 Broker 消息存儲中心和 Proxy 代理

# 啟動(不使用代理)
nohup sh bin/mqbroker -n 192.168.3.86:9876 >mqbroker.log 2>&1 &
# 啟動 Broker+Proxy
nohup sh bin/mqbroker -n 192.168.3.86:9876 --enable-proxy &
# 推薦使用 指定配置文件啟動(broker默認使用的端口是10911,我們也可以在配置文件修改端口)
nohup sh bin/mqbroker -n 192.168.3.86:9876 -c conf/broker.conf --enable-proxy &
# 驗證是否啟動成功
tail -n 500 nohup.out
tail -f ~/logs/rocketmqlogs/broker.log 
tail -f ~/logs/rocketmqlogs/proxy.log 
Wed May 14 12:41:41 CST 2025 rocketmq-proxy startup successfully

使用tail -f ~/logs/rocketmqlogs/broker.log 查看日志如果提示以下

The default acl dir /usr/local/rocketmq/rocketmq-all-5.3.2/conf/acl is not exist

需要切換conf目錄下 新建acl文件夾就行了

mkdir acl

由于v5可參考的文檔太少,這個報錯我也沒找到為什么源碼包里會少一個acl的文件夾,有知道的希望留言告知

測試消息收發(fā)

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

可以通過 mqadmin 命令創(chuàng)建

Admin Tool官方命令工具

注意 TestTopic 是topic名稱

sh bin/mqadmin updatetopic -n 192.168.3.86:9876 -t TestTopic -c DefaultCluster

打印

create topic to 192.168.3.86:10911 success.
TopicConfig [topicName=TestTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]

安裝RocketMQ Dashboard 可視化
官方介紹
按照官方安裝運行就可以
需要注意的點,IP需要跟上面設置的IP一致,防火墻開通8080,8081,10911,9876

注意:默認端口為:8080,不修改會跟Proxy端口沖突,Proxy端口默認的也是8080
我這里修改了RocketMQ Dashboard的默認端口,改成8082
可以在本地運行,也可以打包運行,我是打包運行的,運行成功后訪問:http://192.168.3.86:8082

可以先添加主題,(研究了幾天,沒研究自動添加的方法 , 如果那位大佬研究出來了可以給我分享一下)

點擊提交就行了。研究了源碼,這個添加跟更新是一個接口。提交之后就可以敲代碼了(有研究出來能自動加載Topic的麻煩留言)

如果啟動報錯,需添加 topic

CODE: 17  DESC: No topic route info in name server for the topic: delay-topic

整合rocketmq-client-java

先用官方原生 rocketmq-client-java JDK

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.7</version>
        </dependency>

官方示例代碼可以去看看

發(fā)送普通消息

package com.example.mq.producer;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
public class ProducerExample {
    private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
    public static void main(String[] args) throws ClientException {
        // 接入點地址,需要設置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
        String endpoint = "192.168.3.86:8081";
        // 消息發(fā)送的目標Topic名稱,需要提前創(chuàng)建。
        String topic = "TestTopic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        ClientConfiguration configuration = builder.build();
        // 初始化Producer時需要設置通信配置以及預綁定的Topic。
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        // 普通消息發(fā)送。
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 設置消息索引鍵,可根據關鍵字精確查找某條消息。
                .setKeys("messageKey")
                // 設置消息Tag,用于消費端根據指定Tag過濾消息。
                .setTag("messageTag")
                // 消息體。
                .setBody("你好,mq".getBytes())
                .build();
        try {
            // 發(fā)送消息,需要關注發(fā)送結果,并捕獲失敗等異常。
            SendReceipt sendReceipt = producer.send(message);
            logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
            logger.error("Failed to send message", e);
        }
        // producer.close();
    }
}

訂閱

package com.example.mq.consumer;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PushConsumerExample {
    private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
    private PushConsumerExample() {
    }
    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 接入點地址,需要設置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoints = "192.168.3.86:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        // 訂閱消息的過濾規(guī)則,表示訂閱所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // 為消費者指定所屬的消費者分組,Group需要提前創(chuàng)建。
        String consumerGroup = "YourConsumerGroup";
        // 指定需要訂閱哪個目標Topic,Topic需要提前創(chuàng)建。
        String topic = "TestTopic";
        // 初始化PushConsumer,需要綁定消費者分組ConsumerGroup、通信參數以及訂閱關系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 設置消費者分組。
                .setConsumerGroup(consumerGroup)
                // 設置預綁定的訂閱關系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 設置消費監(jiān)聽器。
                .setMessageListener(messageView -> {
                 // 處理消息并返回消費結果。
                    logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
                    ByteBuffer body = messageView.getBody();
                    String message = StandardCharsets.UTF_8.decode(body).toString();
                    logger.info("Consume message successfully, body={}", message);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        // 如果不需要再使用 PushConsumer,可關閉該實例。
        // pushConsumer.close();
    }
}

4.整合rocketmq-v5-client-spring-boot

整合SpringBoot rocketmq-v5-client-spring-boot

注意原生的rocketmq-client-java需要注釋掉,rocketmq-v5-client-spring-boot已經引入了,不注釋會jar包沖突

引入maven

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-v5-client-spring-boot</artifactId>
   <version>2.3.3</version>
</dependency>

官方示例代碼

先創(chuàng)建變量

public class RocketMQVariable {
    /**
     * 普通消息隊列
     */
    public static final String NORMAL_TOPIC = "normal-topic";
    // 如果使用負載均衡模式 需要設置相同的消費組名
    public static final String NORMAL_GROUP = "normal-group";
    // 處理廣播消費模式使用 
    public static final String NORMAL1_GROUP = "normal1-group";
    /**
     * 異步普通消息隊列
     */
    public static final String ASYNC_NORMAL_TOPIC = "async-normal-topic";
    public static final String ASYNC_NORMAL_GROUP = "async-normal-group";
    /**
     * 順序消息隊列
     */
    public static final String FIFO_TOPIC = "fifo-topic";
    public static final String FIFO_GROUP = "fifo-group";
    /**
     * 定時/延時消息隊列
     */
    public static final String DELAY_TOPIC = "delay-topic";
    public static final String DELAY_GROUP = "delay-group";
    /**
     * 事務消息隊列
     */
    public static final String TRANSACTION_TOPIC = "transaction-topic";
    public static final String TRANSACTION_GROUP = "transaction-group";
}

注意:

如果使用負載均衡模式 需設置相同的Topic 相同的group
如果使用廣播消費模式 需設置相同的Topic 不同的group

編寫工具類

import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.Resource;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.common.Pair;
import org.apache.rocketmq.client.core.RocketMQClientTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
public class RocketMQV2Service {
    private static final Logger log = LoggerFactory.getLogger(RocketMQV2Service.class);
    @Resource
    private RocketMQClientTemplate template;
    /**
     * 發(fā)送普通消息
     *
     * @param topic
     * @param message
     */
    public void syncSendNormalMessage(String topic, Object message) {
        SendReceipt sendReceipt = template.syncSendNormalMessage(topic, message);
        log.info("普通消息發(fā)送完成:topic={},  message = {}, sendReceipt = {}", topic, message, sendReceipt);
    }
    /**
     * 發(fā)送異步普通消息
     *
     * @param topic
     * @param message
     */
    public void asyncSendNormalMessage(String topic, Object message) {
        CompletableFuture<SendReceipt> future = new CompletableFuture<>();
        ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
        future.whenCompleteAsync((sendReceipt, throwable) -> {
            if (null != throwable) {
                log.error("發(fā)送消息失敗", throwable);
                return;
            }
            log.info("發(fā)送異步消息消費成功5, messageId={}", sendReceipt.getMessageId());
        }, sendCallbackExecutor);
        CompletableFuture<SendReceipt> completableFuture = template.asyncSendNormalMessage(topic, message, future);
        log.info("發(fā)送異步消息成功1, topic={},  message = {},  sendReceipt={}", topic, message, completableFuture);
    }
    /**
     * 發(fā)送順序消息
     *
     * @param topic
     * @param message
     * @param messageGroup
     */
    public void syncSendFifoMessage(String topic, Object message, String messageGroup) {
        SendReceipt sendReceipt = template.syncSendFifoMessage(topic, message, messageGroup);
        log.info("順序消息發(fā)送完成:topic={},  message = {}, sendReceipt = {}", topic, message, sendReceipt);
    }
    /**
     * 發(fā)送延時消息
     *
     * @param topic
     * @param message
     * @param delay   單位:秒
     */
    public void syncSendDelayMessage(String topic, Object message, Long delay) {
        SendReceipt sendReceipt = template.syncSendDelayMessage(topic, message, Duration.ofSeconds(delay));
        log.info("延時消息發(fā)送完成 :topic={},  message = {}, sendReceipt = {}", topic, message, sendReceipt);
    }
    /**
     * 發(fā)送延時消息
     *
     * @param topic
     * @param message
     * @param duration Duration.ofSeconds(秒)    Duration.ofMinutes(分鐘)    Duration.ofHours(小時)
     */
    public void syncSendDelayMessage(String topic, Object message, Duration duration) {
        SendReceipt sendReceipt = template.syncSendDelayMessage(topic, message, duration);
        log.info("延時消息發(fā)送完成 :topic={},  message = {}, sendReceipt = {}", topic, message, sendReceipt);
    }
    /**
     * 發(fā)送事務消息
     *
     * @param topic
     * @param message
     * @throws ClientException
     */
    public Pair<SendReceipt, Transaction> sendMessageInTransaction(String topic, Object message) {
        try {
            Pair<SendReceipt, Transaction> pair = template.sendMessageInTransaction(topic, message);
            SendReceipt sendReceipt = pair.getSendReceipt();
            Transaction transaction = pair.getTransaction();
            log.info("事務消息發(fā)送完成   transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt));
            log.info("消息id  : {}  ", sendReceipt.getMessageId());
            //如果這里提交了事務 
            if (doLocalTransaction(1)) {
                log.info("本地事務執(zhí)行成功");
                transaction.commit();
            } else {
                log.info("本地事務執(zhí)行失敗");
                transaction.rollback();
            }
            return pair;
        } catch (ClientException e) {
            throw new RuntimeException(e);
        }
    }
    boolean doLocalTransaction(int number) {
        //  本地事務邏輯 數據庫操作
        log.info("執(zhí)行本地事務 : {}", number);
        return number > 5;
    }
}

測試代碼

yml配置

rocketmq:
  producer:
    endpoints: 192.168.3.86:8081
    topic:
  push-consumer:
    endpoints: 192.168.3.86:8081
    access-key:
    secret-key:
    topic:
    tag: "*"

發(fā)送消息

import com.alibaba.fastjson2.JSONObject;
import com.example.mq.util.RocketMQV2Service;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.common.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.Duration;
@Slf4j
@RestController
@RequestMapping("/send")
public class SendController {
    @Autowired
    private RocketMQV2Service rocketMQV2Service;
    /**
     * 普通消息
     *
     * @return
     */
    @GetMapping("/normal.message")
    public String normalMessage() {
        rocketMQV2Service.syncSendNormalMessage(RocketMQVariable.NORMAL_TOPIC, "hello RocketMQ  這是普通消息");
        return "發(fā)送成功";
    }
    /**
     * 異步普通消息
     *
     * @return
     */
    @GetMapping("/async.normal.message")
    public String asyncSendNormalMessageNormalMessage() {
        rocketMQV2Service.asyncSendNormalMessage(RocketMQVariable.ASYNC_NORMAL_TOPIC, "hello RocketMQ  這是異步普通消息");
        return "發(fā)送成功";
    }
    /**
     * 順序消息
     *
     * @return
     */
    @GetMapping("/flfo.message")
    public String flfoMessage() {
        for (int i = 0; i < 20; i++) {
            rocketMQV2Service.syncSendFifoMessage(RocketMQVariable.FIFO_TOPIC, "hello RocketMQ  這是順序消息" + i, RocketMQVariable.FIFO_GROUP);
        }
        return "發(fā)送成功";
    }
    /**
     * 定時/延時消息
     *
     * @return
     */
    @GetMapping("/delay.message")
    public String delayMessage() {
        rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ  這是30秒定時消息", Duration.ofSeconds(30));
        rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ  這是10秒定時消息  ", 10l);
        rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ  這是1分鐘定時消息", Duration.ofMinutes(1));
        return "發(fā)送成功";
    }
    /**
     * 事務消息
     *
     * @return
     */
    @GetMapping("/transaction.message")
    public String transactionMessage() throws ClientException {
        Pair<SendReceipt, Transaction> sendReceiptTransactionPair = rocketMQV2Service.sendMessageInTransaction(RocketMQVariable.TRANSACTION_TOPIC, "hello RocketMQ  這是事務消息");
        Transaction transaction = sendReceiptTransactionPair.getTransaction();
        SendReceipt sendReceipt = sendReceiptTransactionPair.getSendReceipt();
        MessageId messageId = sendReceipt.getMessageId();
        log.info("事務消息發(fā)送完成   messageId = {}", messageId);
        log.info("事務消息發(fā)送完成   transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt));
            return "發(fā)送成功";
    }
}

普通消息(廣播模式)

import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC,
        consumerGroup = RocketMQVariable.NORMAL_GROUP)
public class PushConsumerNormalService implements RocketMQListener {
    @Override
    public ConsumeResult consume(MessageView messageView) {
        log.info("普通消息, messageView={}", messageView);
        ByteBuffer body = messageView.getBody();
        String message = StandardCharsets.UTF_8.decode(body).toString();
        log.info("普通消息, message={}", message);
        Map<String, String> properties = messageView.getProperties();
        log.info("普通消息, properties={}", JSONObject.toJSONString(properties));
        if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
            log.info("普通消息, message={}", messageView);
            return ConsumeResult.FAILURE;
        }
        return ConsumeResult.SUCCESS;
    }
}
import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC,
        consumerGroup = RocketMQVariable.NORMAL1_GROUP)
public class PushConsumerNormal1Service implements RocketMQListener {
    @Override
    public ConsumeResult consume(MessageView messageView) {
        log.info("普通消息1, messageView={}", messageView);
        ByteBuffer body = messageView.getBody();
        String message = StandardCharsets.UTF_8.decode(body).toString();
        log.info("普通消息1, message={}", message);
        Map<String, String> properties = messageView.getProperties();
        log.info("普通消息1, properties={}", JSONObject.toJSONString(properties));
        if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
            log.info("普通消息1, message={}", messageView);
            return ConsumeResult.FAILURE;
        }
        return ConsumeResult.SUCCESS;
    }
}

日志 (注意consumerGroup 不同)

負載均衡模式 把consumerGroup改為 RocketMQVariable.NORMAL_GROUP

import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC,
        consumerGroup = RocketMQVariable.NORMAL_GROUP)
public class PushConsumerNormal1Service implements RocketMQListener {
    @Override
    public ConsumeResult consume(MessageView messageView) {
        log.info("普通消息1, messageView={}", messageView);
        ByteBuffer body = messageView.getBody();
        String message = StandardCharsets.UTF_8.decode(body).toString();
        log.info("普通消息1, message={}", message);
        Map<String, String> properties = messageView.getProperties();
        log.info("普通消息1, properties={}", JSONObject.toJSONString(properties));
        if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
            log.info("普通消息1, message={}", messageView);
            return ConsumeResult.FAILURE;
        }
        return ConsumeResult.SUCCESS;
    }
}

日志 (會自動選擇一個消費者消費)

順序消費

import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.FIFO_TOPIC,
        consumerGroup = RocketMQVariable.FIFO_GROUP)
public class PushConsumerFifoService implements RocketMQListener {
    @Override
    public ConsumeResult consume(MessageView messageView) {
        log.info("順序消息, messageView={}", messageView);
        ByteBuffer body = messageView.getBody();
        String message = StandardCharsets.UTF_8.decode(body).toString();
        log.info("順序消息, message={}", message);
        Map<String, String> properties = messageView.getProperties();
        log.info("順序消息, properties={}", JSONObject.toJSONString(properties));
        if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
            log.info("順序消息, message={}", messageView);
            return ConsumeResult.FAILURE;
        }
        log.info("rollback transaction");
        return ConsumeResult.SUCCESS;
    }
}

日志

定時/延時任務消息 (可自定義時間)

import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.DELAY_TOPIC,
        consumerGroup = RocketMQVariable.DELAY_GROUP)
public class PushConsumerDelayService implements RocketMQListener {
    @Override
    public ConsumeResult consume(MessageView messageView) {
        log.info("定時/延時消息, messageView={}", messageView);
        ByteBuffer body = messageView.getBody();
        String message = StandardCharsets.UTF_8.decode(body).toString();
        log.info("定時/延時消息, message={}", message);
        Map<String, String> properties = messageView.getProperties();
        log.info("定時/延時消息, properties={}", JSONObject.toJSONString(properties));
        if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {
            log.info("定時/延時消息, message={}", messageView);
            return ConsumeResult.FAILURE;
        }
        log.info("定時/延時消息 消費完成");
        return ConsumeResult.SUCCESS;
    }
}

日志

事務處理情況1

/**
     * 發(fā)送事務消息
     *
     * @param topic
     * @param message
     * @throws ClientException
     */
    public Pair<SendReceipt, Transaction> sendMessageInTransaction(String topic, Object message) {
        try {
            Pair<SendReceipt, Transaction> pair = template.sendMessageInTransaction(topic, message);
            SendReceipt sendReceipt = pair.getSendReceipt();
            Transaction transaction = pair.getTransaction();
            log.info("事務消息發(fā)送完成   transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt));
            log.info("消息id  : {}  ", sendReceipt.getMessageId());
            //如果這里提交了事務 
            if (doLocalTransaction(1)) {
                log.info("本地事務執(zhí)行成功");
                transaction.commit();
            } else {
                log.info("本地事務執(zhí)行失敗");
                transaction.rollback();
            }
            return pair;
        } catch (ClientException e) {
            throw new RuntimeException(e);
        }
    }
    boolean doLocalTransaction(int number) {
        //  本地事務邏輯 數據庫操作
        log.info("執(zhí)行本地事務 : {}", number);
        return number > 5;
    }

如果在工具類里面提交了事務 transaction.commit();下面的就不會進入處理了

@Slf4j
@RocketMQTransactionListener
public class PushConsumerTransactionTemplate implements RocketMQTransactionChecker {
    @Override
    public TransactionResolution check(MessageView messageView) {
        log.info("Receive transactional message check, message={}", messageView);
        return null;
    }
}

而是直接消費了

import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.TRANSACTION_TOPIC, consumerGroup = RocketMQVariable.TRANSACTION_GROUP)
public class PushConsumerTransactionService implements RocketMQListener {
    @Override
    public ConsumeResult consume(MessageView messageView) {
        log.info("事務消息消費, messageView={}", messageView);
        ByteBuffer body = messageView.getBody();
        String message = StandardCharsets.UTF_8.decode(body).toString();
        log.info("事務消息消費, message={}", message);
        if (Objects.isNull(message)) {
            log.info("事務消息 消費失敗");
            return ConsumeResult.FAILURE;
        }
        log.info("事務消息 消費成功");
        return ConsumeResult.SUCCESS;
    }
}

日志

事務處理情況2

/**
     * 發(fā)送事務消息 這里只發(fā)消息  不參與事務提交
     *
     * @param topic
     * @param message
     * @throws ClientException
     */
    public void sendMessageInTransaction(String topic, Object message) {
        try {
          template.sendMessageInTransaction(topic, message);
        } catch (ClientException e) {
            throw new RuntimeException(e);
        }
    }

使用官方事務處理機制處理事務

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.core.RocketMQTransactionChecker;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@Slf4j
@RocketMQTransactionListener
public class PushConsumerTransactionTemplate implements RocketMQTransactionChecker {
    @Override
    public TransactionResolution check(MessageView messageView) {
        log.info("事務消息  事務操作, messageView={}", messageView);
        ByteBuffer body = messageView.getBody();
        String message = StandardCharsets.UTF_8.decode(body).toString();
        log.info("事務消息  事務操作, message={}", message);
        String messageId = messageView.getMessageId().toString();
        if (Objects.nonNull(messageId)) {
            log.info("事務消息  事務操作, messageId={}", messageId);
            return TransactionResolution.COMMIT;
        }
        log.info("事務消息消費失敗");
        return TransactionResolution.ROLLBACK;
    }
}

事務提交后才會被消費

import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.TRANSACTION_TOPIC, consumerGroup = RocketMQVariable.TRANSACTION_GROUP)
public class PushConsumerTransactionService implements RocketMQListener {
    @Override
    public ConsumeResult consume(MessageView messageView) {
        log.info("事務消息消費, messageView={}", messageView);
        ByteBuffer body = messageView.getBody();
        String message = StandardCharsets.UTF_8.decode(body).toString();
        log.info("事務消息消費, message={}", message);
        if (Objects.isNull(message)) {
            log.info("事務消息 消費失敗");
            return ConsumeResult.FAILURE;
        }
        log.info("事務消息 消費成功");
        return ConsumeResult.SUCCESS;
    }
}

日志

到此這篇關于SprigBoot整合rocketmq-v5-client-spring-boot的詳細過程的文章就介紹到這了,更多相關SprigBoot rocketmq-v5-client-spring-boot內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Spring Boot詳解配置文件的用途與用法

    Spring Boot詳解配置文件的用途與用法

    SpringBoot項目是一個標準的Maven項目,它的配置文件需要放在src/main/resources/下,其文件名必須為application,其存在兩種文件形式,分別是properties和yaml(或者yml)文件
    2022-06-06
  • PipedWriter和PipedReader源碼分析_動力節(jié)點Java學院整理

    PipedWriter和PipedReader源碼分析_動力節(jié)點Java學院整理

    這篇文章主要介紹了PipedWriter和PipedReader源碼分析_動力節(jié)點Java學院整理,需要的朋友可以參考下
    2017-05-05
  • Spring Boot 集成 MongoDB Template 的步驟詳解

    Spring Boot 集成 MongoDB Template 的步驟

    MongoDB 是一個流行的 NoSQL 數據庫,適合處理大量非結構化數據,本篇文章將詳細介紹如何在 Spring Boot 3.4.0 中集成 MongoDB Template,從零開始構建一個簡單的應用程序,感興趣的朋友一起看看吧
    2024-12-12
  • Java文件(io)編程_文件字節(jié)流的使用方法

    Java文件(io)編程_文件字節(jié)流的使用方法

    下面小編就為大家?guī)硪黄狫ava文件(io)編程_文件字節(jié)流的使用方法。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-08-08
  • Java隊列篇之實現數組模擬隊列及可復用環(huán)形隊列詳解

    Java隊列篇之實現數組模擬隊列及可復用環(huán)形隊列詳解

    像棧一樣,隊列(queue)也是一種線性表,它的特性是先進先出,插入在一端,刪除在另一端。就像排隊一樣,剛來的人入隊(push)要排在隊尾(rear),每次出隊(pop)的都是隊首(front)的人
    2021-10-10
  • Java生成word文檔的示例詳解

    Java生成word文檔的示例詳解

    這篇文章主要為大家詳細介紹了如何利用Java語言生成word文檔,文中的示例代碼講解詳細,具有一定的借鑒價值,需要的小伙伴可以參考一下
    2022-12-12
  • 程序員最喜歡的ThreadLocal使用姿勢

    程序員最喜歡的ThreadLocal使用姿勢

    ThreadLocal并不是一個Thread,而是Thread的局部變量,也許把它命名為ThreadLocalVariable更容易讓人理解一些,下面這篇文章主要給大家介紹了程序員最喜歡的ThreadLocal使用姿勢,需要的朋友可以參考下
    2022-02-02
  • Springboot任務之異步任務的使用詳解

    Springboot任務之異步任務的使用詳解

    今天學習了一個新技能SpringBoot實現異步任務,所以特地整理了本篇文章,文中有非常詳細的介紹及代碼示例,需要的朋友可以參考下
    2021-06-06
  • 通過Java修改游戲存檔的實現思路

    通過Java修改游戲存檔的實現思路

    這篇文章主要介紹了通過Java修改游戲存檔的實現思路,實現方法也很簡單,因為植物大戰(zhàn)僵尸游戲的數據文件存儲在本地的存儲位置是已知的,因此我們可以將實現過程拆分為三個步驟,需要的朋友可以參考下
    2021-10-10
  • 解決springboot 獲取form-data里的file文件的問題

    解決springboot 獲取form-data里的file文件的問題

    這篇文章主要介紹了解決springboot 獲取form-data里的file文件的問題的相關資料,這里提供了詳細的解決步驟,需要的朋友可以參考下
    2017-07-07

最新評論