欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot配置RocketMQ的詳細(xì)過程

 更新時(shí)間:2025年09月18日 17:25:17   作者:弄個(gè)昵稱  
這篇文章主要介紹了SpringBoot配置RocketMQ的詳細(xì)過程,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧
  1. 引入maven
  		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>
  1. 配置yml
# rocketmq 配置項(xiàng),對(duì)應(yīng) RocketMQProperties 配置類
rocketmq:
  name-server: 127.0.0.1:9876 # RocketMQ Namesrv
  # Producer 配置項(xiàng)
  producer:
    group: demo-producer-group # 生產(chǎn)者分組
    send-message-timeout: 3000 # 發(fā)送消息超時(shí)時(shí)間,單位:毫秒。默認(rèn)為 3000 。
    compress-message-body-threshold: 4096 # 消息壓縮閥值,當(dāng)消息體的大小超過該閥值后,進(jìn)行消息壓縮。默認(rèn)為 4 * 1024B
    max-message-size: 4194304 # 消息體的最大允許大小。。默認(rèn)為 4 * 1024 * 1024B
    retry-times-when-send-failed: 2 # 同步發(fā)送消息時(shí),失敗重試次數(shù)。默認(rèn)為 2 次。
    retry-times-when-send-async-failed: 2 # 異步發(fā)送消息時(shí),失敗重試次數(shù)。默認(rèn)為 2 次。
    retry-next-server: false # 發(fā)送消息給 Broker 時(shí),如果發(fā)送失敗,是否重試另外一臺(tái) Broker 。默認(rèn)為 false
    access-key: # Access Key ,可閱讀 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文檔
    secret-key: # Secret Key
    enable-msg-trace: true # 是否開啟消息軌跡功能。默認(rèn)為 true 開啟??砷喿x https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文檔
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定義消息軌跡的 Topic 。默認(rèn)為 RMQ_SYS_TRACE_TOPIC 。
  # Consumer 配置項(xiàng)
  consumer:
    listeners: # 配置某個(gè)消費(fèi)分組,是否監(jiān)聽指定 Topic 。結(jié)構(gòu)為 Map<消費(fèi)者分組, <Topic, Boolean>> 。默認(rèn)情況下,不配置表示監(jiān)聽。
      test-consumer-group:
        topic1: false # 關(guān)閉 test-consumer-group 對(duì) topic1 的監(jiān)聽消費(fèi)
  1. 配置變量
/**
 * 消息隊(duì)列相關(guān)常亮配置,包括group、topic、tag
 **/
public class MqTopicConstant {
    /**
     * 示例消息隊(duì)列,topic1個(gè)
     */
    public static final String DEMO_TOPIC = "test-top-1";
    /**
     * 注冊(cè)tag
     */
    public static final String DEMO_TAG_REGISTERED = "registered";
    /**
     * 修改tag
     */
    public static final String DEMO_TAG_MODIFY = "modify";
}
  1. 創(chuàng)建Service
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
@Component
public class RocketMQService {
    private static final Logger log = LoggerFactory.getLogger(RocketMQService.class);
    @Resource
    private RocketMQTemplate template;
    /**
     * 發(fā)送普通消息
     *
     * @param topic   topic
     * @param message 消息體
     */
    public void sendMessage(String topic, Object message) {
        this.template.convertAndSend(topic, message);
        log.info("普通消息發(fā)送完成:message = {}", message);
    }
    /**
     * 發(fā)送同步消息
     *
     * @param topic   topic
     * @param message 消息體
     */
    public void syncSendMessage(String topic, Object message) {
        SendResult sendResult = this.template.syncSend(topic, message);
        log.info("同步發(fā)送消息完成:message = {}, sendResult = {}", message, sendResult);
    }
    /**
     * 發(fā)送同步消息
     *
     * @param topic   topic
     * @param message 消息體
     */
    public SendResult syncSendMessageR(String topic, Object message) {
        SendResult sendResult = this.template.syncSend(topic, message);
        log.info("同步發(fā)送消息完成:message = {}, sendResult = {}", message, sendResult);
        SendStatus sendStatus = sendResult.getSendStatus();
        log.info("狀態(tài)打印 : {}" , sendStatus);
        //SEND_OK
        return sendResult;
    }
    /**
     * 發(fā)送異步消息
     *
     * @param topic   topic
     * @param message 消息體
     */
    public void asyncSendMessage(String topic, final Object message) {
        this.template.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("異步消息發(fā)送成功, SendStatus = {}", sendResult.getSendStatus());
//                log.info("異步消息發(fā)送成功,message = {}, SendStatus = {}", message, sendResult.getSendStatus());
            }
            @Override
            public void onException(Throwable e) {
                log.info("異步消息發(fā)送異常,exception = {}", e.getMessage());
            }
        });
    }
    /**
     * 發(fā)送單向消息
     *
     * @param topic   topic
     * @param message 消息體
     */
    public void sendOneWayMessage(String topic, Object message) {
        this.template.sendOneWay(topic, message);
        log.info("單向發(fā)送消息完成:message = {}", message);
    }
    /**
     * 同步發(fā)送批量消息
     *
     * @param topic       topic
     * @param messageList 消息集合
     * @param timeout     超時(shí)時(shí)間(毫秒)
     */
    public void syncSendMessages(String topic, List<Message<?>> messageList, long timeout) {
        this.template.syncSend(topic, messageList, timeout);
        log.info("同步發(fā)送批量消息完成:message = {}", JSON.toJSONString(messageList));
    }
    /**
     * 發(fā)送攜帶 tag 的消息(過濾消息)
     *
     * @param topic   topic,RocketMQTemplate將 topic 和 tag 合二為一了,底層會(huì)進(jìn)行
     *                拆分再組裝。只要在指定 topic 時(shí)跟上 {:tags} 就可以指定tag
     *                例如 test-topic:tagA
     * @param message 消息體
     */
    public void syncSendMessageWithTag(String topic, Object message) {
        this.template.syncSend(topic, message);
        log.info("發(fā)送帶 tag 的消息完成:message = {}", message);
    }
    /**
     * 同步發(fā)送延時(shí)消息
     *
     * @param topic      topic
     * @param message    消息體
     * @param timeout    超時(shí)
     * @param delayLevel 延時(shí)等級(jí):現(xiàn)在RocketMq并不支持任意時(shí)間的延時(shí),需要設(shè)置幾個(gè)固定的延時(shí)等級(jí),
     *                   從1s到2h分別對(duì)應(yīng)著等級(jí) 1 到 18,消息消費(fèi)失敗會(huì)進(jìn)入延時(shí)消息隊(duì)列
     *                   "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
     */
    public void syncSendDelay(String topic, Object message, long timeout, int delayLevel) {
        this.template.syncSend(topic, MessageBuilder.withPayload(message).build(), timeout, delayLevel);
        log.info("已同步發(fā)送延時(shí)消息 message = {}", message);
    }
    /**
     * 異步發(fā)送延時(shí)消息
     *
     * @param topic      topic
     * @param message    消息對(duì)象
     * @param timeout    超時(shí)時(shí)間
     * @param delayLevel 延時(shí)等級(jí)
     */
    public void asyncSendDelay(String topic, final Object message, long timeout, int delayLevel) {
        this.template.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("異步發(fā)送延時(shí)消息成功,message = {}", message);
            }
            @Override
            public void onException(Throwable throwable) {
                log.error("異步發(fā)送延時(shí)消息發(fā)生異常,exception = {}", throwable.getMessage());
            }
        }, timeout, delayLevel);
        log.info("已異步發(fā)送延時(shí)消息 message = {}", message);
    }
    /**
     * 發(fā)送事務(wù)消息
     *
     * @param topic       topic,RocketMQTemplate將 topic 和 tag 合二為一了,底層會(huì)進(jìn)行
     *                	  拆分再組裝。只要在指定 topic 時(shí)跟上 {:tags} 就可以指定tag
     *                	  例如 test-topic:tagA
     * @param message    消息對(duì)象
     * @param arg        傳給事務(wù)監(jiān)聽器的參數(shù)(可以作為事務(wù)處理的唯一ID,來驗(yàn)證本地事務(wù))
     */
    public void sendMessageInTransaction(String topic, final Object message ,final Object arg) {
        TransactionSendResult res = this.template.sendMessageInTransaction(topic, MessageBuilder.withPayload(message).build(), arg);
        if (res.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) && res.getSendStatus().equals(SendStatus.SEND_OK)) {
            log.info("【生產(chǎn)者】事物消息發(fā)送成功;成功結(jié)果:{}", res);
        } else {
            log.info("【生產(chǎn)者】事務(wù)發(fā)送失?。菏≡颍簕}", res);
        }
    }
}
  1. 事務(wù)監(jiān)聽器
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQTransactionListener
public class TranscationRocketListener implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
   		 // 獲取第三個(gè)參數(shù)(用戶自定義參數(shù))
        Integer customArg = (Integer) arg;
        log.info("執(zhí)行本地事務(wù),自定義參數(shù): {}", customArg);
        String tag = String.valueOf(message.getHeaders().get("rocketmq_TAGS"));
        log.info("這里是校驗(yàn)TAG: {}" , tag  );
		//RocketMQLocalTransactionState.COMMIT
		//RocketMQLocalTransactionState.ROLLBACK
		//RocketMQLocalTransactionState.UNKNOWN
        return RocketMQLocalTransactionState.UNKNOWN;
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("檢查本地交易: {}", message);
        return RocketMQLocalTransactionState.COMMIT;
    }
}

狀態(tài)解釋

 - RocketMQLocalTransactionState.COMMIT:
   	含義: 表示本地事務(wù)已成功執(zhí)行,允許提交消息。這意味著消息將對(duì)消費(fèi)者可見,可以被正常消費(fèi)。
	使用場(chǎng)景: 當(dāng)您的業(yè)務(wù)邏輯成功執(zhí)行且希望該消息能夠被下游系統(tǒng)處理時(shí),應(yīng)返回此狀態(tài)。
 - RocketMQLocalTransactionState.ROLLBACK:
 	含義: 表示本地事務(wù)執(zhí)行失敗,要求回滾消息。即該消息不會(huì)被發(fā)送給任何消費(fèi)者。
	使用場(chǎng)景: 如果您的業(yè)務(wù)邏輯執(zhí)行過程中遇到錯(cuò)誤或異常情況,不希望該消息影響下游系統(tǒng),則應(yīng)回滾事務(wù),返回此狀態(tài)。
 - RocketMQLocalTransactionState.UNKNOWN:
	含義: 表示當(dāng)前無法確定事務(wù)的狀態(tài),可能是因?yàn)榫W(wǎng)絡(luò)問題或其他原因?qū)е聲簳r(shí)無法判斷。RocketMQ 會(huì)定期調(diào)用 checkLocalTransaction 方法來檢查事務(wù)的狀態(tài)。
	使用場(chǎng)景: 當(dāng)您不確定事務(wù)是否成功完成時(shí)(例如,遠(yuǎn)程服務(wù)調(diào)用超時(shí)),可以返回此狀態(tài)。RocketMQ 將通過 checkLocalTransaction 方法嘗試再次確認(rèn)事務(wù)狀態(tài)。
  1. 測(cè)試發(fā)送消息
@Autowired
    private RocketMQService rocketMQService;
	## 發(fā)送事務(wù)消息
    rocketMQService.sendMessageInTransaction(GpsConstants.DEDUCTION_MESSP_TOPIC, "這是測(cè)試消息");
	## 消費(fèi)事務(wù)消息
   	@Service
    @RocketMQMessageListener(topic = GpsConstants.DEDUCTION_MESSP_TOPIC
            , consumerGroup = GpsConstants.DEDUCTION_MESSP_GROUP)
    public class deductionTopic implements RocketMQListener<String> {
        @Override
        public void onMessage(String msg) {
             System.out.println("msg : " + msg);
			//這里就會(huì)打印msg : 這是測(cè)試消息
        }
    }

提示 : 同一個(gè)topic,不同的consumerGroup都會(huì)消費(fèi),根據(jù)自己的業(yè)務(wù)指定不同的 consumerGroup 處理不同的業(yè)務(wù),如果不需要,則一個(gè)topic只能用一次

到此這篇關(guān)于SpringBoot配置RocketMQ的詳細(xì)過程的文章就介紹到這了,更多相關(guān)SpringBoot配置RocketMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 系統(tǒng)運(yùn)維問題排查-java內(nèi)存過高分析及說明

    系統(tǒng)運(yùn)維問題排查-java內(nèi)存過高分析及說明

    本文總結(jié)了監(jiān)控Java進(jìn)程內(nèi)存與線程狀態(tài)的常用命令及參數(shù),包括top排序、jmap查看內(nèi)存分布、jstat分析GC數(shù)據(jù)、jstack解析線程狀態(tài)等,強(qiáng)調(diào)需使用與進(jìn)程一致的用戶執(zhí)行,并解析了線程狀態(tài)和內(nèi)存區(qū)域的含義
    2025-07-07
  • PostgreSQL Docker部署+SpringBoot集成方式

    PostgreSQL Docker部署+SpringBoot集成方式

    本文介紹了如何在Docker中部署PostgreSQL和pgadmin,并通過SpringBoot集成PostgreSQL,主要步驟包括安裝PostgreSQL和pgadmin,配置防火墻,創(chuàng)建數(shù)據(jù)庫和表,以及在SpringBoot中配置數(shù)據(jù)源和實(shí)體類
    2024-12-12
  • java 字符串池的深入理解

    java 字符串池的深入理解

    這篇文章主要介紹了java 字符串池的深入理解的相關(guān)資料,這里提供實(shí)例代碼幫助大家學(xué)習(xí)理解這部分內(nèi)容,希望大家能夠掌握,需要的朋友可以參考下
    2017-08-08
  • 基于SpringBoot整合SSMP案例(開啟日志與分頁查詢條件查詢功能實(shí)現(xiàn))

    基于SpringBoot整合SSMP案例(開啟日志與分頁查詢條件查詢功能實(shí)現(xiàn))

    這篇文章主要介紹了基于SpringBoot整合SSMP案例(開啟日志與分頁查詢條件查詢功能實(shí)現(xiàn)),本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋參考下吧
    2023-11-11
  • JAVA異常體系結(jié)構(gòu)詳解

    JAVA異常體系結(jié)構(gòu)詳解

    Java把異常當(dāng)作對(duì)象來處理,并定義一個(gè)基類java.lang.Throwable作為所有異常的超類,下面通過本文給大家分享JAVA異常體系結(jié)構(gòu),感興趣的朋友一起看看吧
    2017-11-11
  • 一次因Java應(yīng)用造成CPU過高的排查實(shí)踐過程

    一次因Java應(yīng)用造成CPU過高的排查實(shí)踐過程

    一個(gè)應(yīng)用占用CPU很高,除了確實(shí)是計(jì)算密集型應(yīng)用之外,通常原因都是出現(xiàn)了死循環(huán)。下面這篇文章主要給大家介紹了一次因Java應(yīng)用造成CPU過高的排查實(shí)踐過程,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2018-11-11
  • Mybatis如何獲取insert新增數(shù)據(jù)id值

    Mybatis如何獲取insert新增數(shù)據(jù)id值

    這篇文章主要介紹了Mybatis如何獲取insert新增數(shù)據(jù)id值問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • 面向?qū)ο蠛兔嫦蜻^程的區(qū)別(動(dòng)力節(jié)點(diǎn)java學(xué)院整理)

    面向?qū)ο蠛兔嫦蜻^程的區(qū)別(動(dòng)力節(jié)點(diǎn)java學(xué)院整理)

    很多朋友不清楚面向?qū)ο蠛兔嫦蜻^程有什么區(qū)別,接下來小編給大家整理了關(guān)于面向?qū)ο蠛兔嫦蜻^程的區(qū)別講解,感興趣的朋友可以參考下
    2017-04-04
  • Java異常處理中的一些特殊情況舉例

    Java異常處理中的一些特殊情況舉例

    這篇文章主要介紹了Java異常處理中的一些特殊情況舉例,分別是只用try和finally不用catch,以及finally語句不被執(zhí)行的情況,需要的朋友可以參考下
    2015-11-11
  • java數(shù)字和中文算數(shù)驗(yàn)證碼的實(shí)現(xiàn)

    java數(shù)字和中文算數(shù)驗(yàn)證碼的實(shí)現(xiàn)

    這篇文章主要介紹了java數(shù)字和中文算數(shù)驗(yàn)證碼的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-07-07

最新評(píng)論