欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot+RocketMQ實(shí)現(xiàn)延遲消息的示例代碼

 更新時(shí)間:2025年10月24日 11:08:29   作者:匆匆忙忙游刃有余  
本文主要介紹了SpringBoot+RocketMQ實(shí)現(xiàn)延遲消息案例詳解,包括基于延遲級(jí)別和基于具體時(shí)間兩種方式的完整實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

下面將詳細(xì)介紹如何在SpringBoot中使用RocketMQ實(shí)現(xiàn)延遲消息,包括基于延遲級(jí)別和基于具體時(shí)間兩種方式的完整實(shí)現(xiàn)。

一、延遲消息概述

RocketMQ提供了兩種類型的延遲消息機(jī)制:

  1. 延遲消息:消息發(fā)送后延遲指定的時(shí)間長(zhǎng)度再被消費(fèi)
  2. 定時(shí)消息:消息在指定的具體時(shí)間點(diǎn)被消費(fèi)

這兩種機(jī)制在訂單超時(shí)取消、會(huì)議提醒、定時(shí)任務(wù)調(diào)度等場(chǎng)景中有廣泛應(yīng)用。

二、環(huán)境準(zhǔn)備

1. 添加Maven依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

2. 配置文件設(shè)置

application.yml中配置RocketMQ連接信息:

rocketmq:
  name-server: localhost:9876
  producer:
    group: delay-message-producer-group

三、延遲級(jí)別機(jī)制實(shí)現(xiàn)

1. 默認(rèn)延遲級(jí)別

RocketMQ默認(rèn)提供18個(gè)延遲級(jí)別,定義在MessageStoreConfig類中:

messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"

對(duì)應(yīng)關(guān)系:

  • level=1: 延遲1秒
  • level=2: 延遲5秒
  • level=3: 延遲10秒
  • level=4: 延遲30秒
  • level=5: 延遲1分鐘
  • level=6: 延遲2分鐘
  • ...以此類推
  • level=18: 延遲2小時(shí)

2. 基于延遲級(jí)別的生產(chǎn)者實(shí)現(xiàn)

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class DelayLevelProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 發(fā)送基于延遲級(jí)別的消息
     * @param topic 主題
     * @param tag 標(biāo)簽
     * @param message 消息內(nèi)容
     * @param delayLevel 延遲級(jí)別(1-18)
     */
    public void sendMessageByDelayLevel(String topic, String tag, String message, int delayLevel) {
        // 創(chuàng)建消息
        Message<String> springMessage = MessageBuilder.withPayload(message).build();
        
        // 發(fā)送延遲消息
        SendResult sendResult = rocketMQTemplate.syncSend(
            topic + ":" + tag, 
            springMessage, 
            3000, // 超時(shí)時(shí)間
            delayLevel // 延遲級(jí)別
        );
        
        System.out.println("延遲級(jí)別消息發(fā)送成功: " + sendResult);
    }
    
    /**
     * 發(fā)送訂單超時(shí)取消消息(延遲15分鐘)
     */
    public void sendOrderTimeoutMessage(String orderId) {
        String message = "訂單超時(shí)取消: " + orderId;
        // 15分鐘對(duì)應(yīng)level=14(根據(jù)默認(rèn)配置)
        sendMessageByDelayLevel("OrderTopic", "Timeout", message, 14);
    }
}

四、基于具體時(shí)間的延遲消息實(shí)現(xiàn)

1. 定時(shí)消息生產(chǎn)者

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class ScheduledMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 發(fā)送延遲指定毫秒數(shù)的消息
     */
    public void sendMessageWithDelayMs(String topic, String message, long delayMs) {
        // 計(jì)算投遞時(shí)間
        long deliverTimeMs = System.currentTimeMillis() + delayMs;
        
        // 創(chuàng)建消息并設(shè)置投遞時(shí)間
        Message<String> springMessage = MessageBuilder.withPayload(message)
            .setHeader(MessageConst.PROPERTY_DELAY_TIME_MS, String.valueOf(delayMs))
            .setHeader(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(deliverTimeMs))
            .build();
        
        SendResult sendResult = rocketMQTemplate.syncSend(topic, springMessage);
        System.out.println("延遲毫秒消息發(fā)送成功: " + sendResult);
    }
    
    /**
     * 發(fā)送指定時(shí)間點(diǎn)投遞的消息
     */
    public void sendMessageAtTime(String topic, String message, Date deliverTime) {
        long deliverTimeMs = deliverTime.getTime();
        
        // 創(chuàng)建消息并設(shè)置投遞時(shí)間
        Message<String> springMessage = MessageBuilder.withPayload(message)
            .setHeader(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(deliverTimeMs))
            .build();
        
        SendResult sendResult = rocketMQTemplate.syncSend(topic, springMessage);
        System.out.println("定時(shí)投遞消息發(fā)送成功: " + sendResult);
    }
    
    /**
     * 發(fā)送10秒后投遞的消息
     */
    public void sendTenSecondsLaterMessage(String topic, String message) {
        sendMessageWithDelayMs(topic, message, 10000L);
    }
}

五、消費(fèi)者實(shí)現(xiàn)

延遲消息的消費(fèi)者與普通消息消費(fèi)者相同,無(wú)需特殊配置:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Component
@RocketMQMessageListener(
    topic = "OrderTopic",
    consumerGroup = "delay-message-consumer-group",
    selectorExpression = "Timeout"
)
public class OrderTimeoutConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        System.out.println("[" + now + "] 接收到訂單超時(shí)消息: " + message);
        
        // 處理訂單取消邏輯
        processOrderCancellation(message);
    }
    
    private void processOrderCancellation(String message) {
        // 提取訂單ID
        String orderId = message.substring(message.indexOf(":") + 2);
        System.out.println("執(zhí)行訂單取消操作,訂單ID: " + orderId);
        // 這里可以調(diào)用訂單服務(wù)進(jìn)行取消操作
    }
}

六、Controller層實(shí)現(xiàn)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.web.bind.annotation.*;

import java.util.Date;

@RestController
@RequestMapping("/api/delay")
public class DelayMessageController {
    
    @Autowired
    private DelayLevelProducer delayLevelProducer;
    
    @Autowired
    private ScheduledMessageProducer scheduledMessageProducer;
    
    /**
     * 發(fā)送基于延遲級(jí)別的消息
     */
    @PostMapping("/level")
    public String sendByDelayLevel(
            @RequestParam String topic,
            @RequestParam String tag,
            @RequestParam String message,
            @RequestParam(defaultValue = "3") int delayLevel) {
        
        delayLevelProducer.sendMessageByDelayLevel(topic, tag, message, delayLevel);
        return "延遲級(jí)別消息發(fā)送成功,延遲級(jí)別: " + delayLevel;
    }
    
    /**
     * 發(fā)送訂單超時(shí)取消消息
     */
    @PostMapping("/order/timeout")
    public String sendOrderTimeout(@RequestParam String orderId) {
        delayLevelProducer.sendOrderTimeoutMessage(orderId);
        return "訂單超時(shí)取消消息已發(fā)送,訂單ID: " + orderId;
    }
    
    /**
     * 發(fā)送延遲指定毫秒的消息
     */
    @PostMapping("/milliseconds")
    public String sendByDelayMs(
            @RequestParam String topic,
            @RequestParam String message,
            @RequestParam long delayMs) {
        
        scheduledMessageProducer.sendMessageWithDelayMs(topic, message, delayMs);
        return "延遲毫秒消息發(fā)送成功,延遲: " + delayMs + "ms";
    }
    
    /**
     * 發(fā)送指定時(shí)間點(diǎn)的消息
     */
    @PostMapping("/scheduled")
    public String sendScheduled(
            @RequestParam String topic,
            @RequestParam String message,
            @RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Date deliverTime) {
        
        scheduledMessageProducer.sendMessageAtTime(topic, message, deliverTime);
        return "定時(shí)消息發(fā)送成功,投遞時(shí)間: " + deliverTime;
    }
}

七、自定義延遲級(jí)別配置

在Broker的配置文件中可以自定義延遲級(jí)別:

# 在broker.conf文件中添加
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 3h 4h 5h

重啟Broker使其生效。注意,修改延遲級(jí)別后,所有使用延遲級(jí)別的消息都會(huì)使用新的配置。

八、兩種實(shí)現(xiàn)方式對(duì)比

特性基于延遲級(jí)別基于具體時(shí)間
靈活性較低,只能使用預(yù)定義級(jí)別高,可以精確到毫秒
適用版本全版本支持RocketMQ 5.x及以上版本完整支持
使用場(chǎng)景固定延遲時(shí)間的場(chǎng)景需要精確控制投遞時(shí)間的場(chǎng)景
配置復(fù)雜度簡(jiǎn)單,無(wú)需額外配置可能需要在Broker端開啟相關(guān)功能

九、使用注意事項(xiàng)

  1. 延遲精度

    • 延遲消息的投遞時(shí)間不是完全精確的,有一定誤差
    • 在高并發(fā)場(chǎng)景下,誤差可能會(huì)增大
  2. 版本兼容性

    • 基于具體時(shí)間的延遲消息在RocketMQ 5.x版本支持更完善
    • 在低版本中可能需要使用延遲級(jí)別機(jī)制
  3. 性能考慮

    • 大量延遲消息可能會(huì)增加Broker的負(fù)擔(dān)
    • 對(duì)于長(zhǎng)時(shí)間延遲的消息,考慮使用其他方案(如定時(shí)任務(wù)+消息隊(duì)列組合)
  4. 消息可靠性

    • 延遲消息同樣支持持久化,確保Broker重啟后不會(huì)丟失
    • 建議開啟消息確認(rèn)機(jī)制確保消息可靠投遞

十、測(cè)試示例

  1. 發(fā)送訂單超時(shí)取消消息(延遲15分鐘):

    POST /api/delay/order/timeout?orderId=ORDER123456
    
  2. 發(fā)送10秒后投遞的消息:

    POST /api/delay/milliseconds?topic=TestTopic&message=HelloDelay&delayMs=10000
    
  3. 發(fā)送指定時(shí)間點(diǎn)的消息:

    POST /api/delay/scheduled?topic=TestTopic&message=HelloScheduled&deliverTime=2024-12-25%2000:00:00
    

通過以上配置和代碼,您可以在SpringBoot項(xiàng)目中輕松實(shí)現(xiàn)基于RocketMQ的延遲消息功能,滿足各種定時(shí)任務(wù)和延遲處理的業(yè)務(wù)需求。

到此這篇關(guān)于SpringBoot+RocketMQ實(shí)現(xiàn)延遲消息的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot RocketMQ 延遲內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 關(guān)于Java?中?Future?的?get?方法超時(shí)問題

    關(guān)于Java?中?Future?的?get?方法超時(shí)問題

    這篇文章主要介紹了Java?中?Future?的?get?方法超時(shí),最常見的理解就是,“超時(shí)以后,當(dāng)前線程繼續(xù)執(zhí)行,線程池里的對(duì)應(yīng)線程中斷”,真的是這樣嗎?本文給大家詳細(xì)介紹,需要的朋友參考下吧
    2022-06-06
  • RabbitMQ工作模式中的發(fā)布確認(rèn)模式示例詳解

    RabbitMQ工作模式中的發(fā)布確認(rèn)模式示例詳解

    發(fā)布確認(rèn)模式用于確保消息已經(jīng)被正確地發(fā)送到RabbitMQ服務(wù)器,并被成功接收和持久化,本文通過實(shí)例代碼給大家介紹RabbitMQ工作模式之發(fā)布確認(rèn)模式,感興趣的朋友一起看看吧
    2025-05-05
  • mybatis中返回主鍵一直為1的問題

    mybatis中返回主鍵一直為1的問題

    這篇文章主要介紹了mybatis中返回主鍵一直為1的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • 詳解JAVA的封裝

    詳解JAVA的封裝

    Java面向?qū)ο蟮娜筇匦裕悍庋b、繼承、多態(tài)。下面對(duì)三大特性之一封裝進(jìn)行了總結(jié),需要的朋友可以參考下
    2017-04-04
  • 定時(shí)任務(wù)@Scheduled用法及其參數(shù)使用

    定時(shí)任務(wù)@Scheduled用法及其參數(shù)使用

    這篇文章主要介紹了定時(shí)任務(wù)@Scheduled用法及其參數(shù)使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • 關(guān)于Struts2文件上傳與自定義攔截器

    關(guān)于Struts2文件上傳與自定義攔截器

    本篇文章,小編將為大家介紹關(guān)于Struts2文件上傳與自定義攔截器,有需要的朋友可以參考一下
    2013-04-04
  • java實(shí)現(xiàn)計(jì)算器加法小程序(圖形化界面)

    java實(shí)現(xiàn)計(jì)算器加法小程序(圖形化界面)

    這篇文章主要介紹了Java實(shí)現(xiàn)圖形化界面的計(jì)算器加法小程序,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2020-05-05
  • Java包裝類之自動(dòng)裝箱與拆箱

    Java包裝類之自動(dòng)裝箱與拆箱

    這篇文章主要介紹了Java包裝類之自動(dòng)裝箱與拆箱,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-09-09
  • springboot如何使用MybatisPlus

    springboot如何使用MybatisPlus

    MyBatisPlus是一個(gè)強(qiáng)大的數(shù)據(jù)庫(kù)操作框架,其代碼生成器可以快速生成實(shí)體類、映射文件等,本文介紹了如何導(dǎo)入MyBatisPlus相關(guān)依賴,創(chuàng)建代碼生成器,并配置數(shù)據(jù)庫(kù)信息以逆向生成代碼,感興趣的朋友跟隨小編一起看看吧
    2024-09-09
  • MyBatis-Plus 使用枚舉自動(dòng)關(guān)聯(lián)注入

    MyBatis-Plus 使用枚舉自動(dòng)關(guān)聯(lián)注入

    本文主要介紹了MyBatis-Plus 使用枚舉自動(dòng)關(guān)聯(lián)注入,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-06-06

最新評(píng)論