SpringBoot+RocketMQ實(shí)現(xiàn)延遲消息的示例代碼
下面將詳細(xì)介紹如何在SpringBoot中使用RocketMQ實(shí)現(xiàn)延遲消息,包括基于延遲級(jí)別和基于具體時(shí)間兩種方式的完整實(shí)現(xiàn)。
一、延遲消息概述
RocketMQ提供了兩種類型的延遲消息機(jī)制:
- 延遲消息:消息發(fā)送后延遲指定的時(shí)間長(zhǎng)度再被消費(fèi)
- 定時(shí)消息:消息在指定的具體時(shí)間點(diǎn)被消費(fèi)
這兩種機(jī)制在訂單超時(shí)取消、會(huì)議提醒、定時(shí)任務(wù)調(diào)度等場(chǎng)景中有廣泛應(yīng)用。
二、環(huán)境準(zhǔn)備
1. 添加Maven依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
2. 配置文件設(shè)置
在application.yml中配置RocketMQ連接信息:
rocketmq:
name-server: localhost:9876
producer:
group: delay-message-producer-group
三、延遲級(jí)別機(jī)制實(shí)現(xiàn)
1. 默認(rèn)延遲級(jí)別
RocketMQ默認(rèn)提供18個(gè)延遲級(jí)別,定義在MessageStoreConfig類中:
messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
對(duì)應(yīng)關(guān)系:
- level=1: 延遲1秒
- level=2: 延遲5秒
- level=3: 延遲10秒
- level=4: 延遲30秒
- level=5: 延遲1分鐘
- level=6: 延遲2分鐘
- ...以此類推
- level=18: 延遲2小時(shí)
2. 基于延遲級(jí)別的生產(chǎn)者實(shí)現(xiàn)
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class DelayLevelProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 發(fā)送基于延遲級(jí)別的消息
* @param topic 主題
* @param tag 標(biāo)簽
* @param message 消息內(nèi)容
* @param delayLevel 延遲級(jí)別(1-18)
*/
public void sendMessageByDelayLevel(String topic, String tag, String message, int delayLevel) {
// 創(chuàng)建消息
Message<String> springMessage = MessageBuilder.withPayload(message).build();
// 發(fā)送延遲消息
SendResult sendResult = rocketMQTemplate.syncSend(
topic + ":" + tag,
springMessage,
3000, // 超時(shí)時(shí)間
delayLevel // 延遲級(jí)別
);
System.out.println("延遲級(jí)別消息發(fā)送成功: " + sendResult);
}
/**
* 發(fā)送訂單超時(shí)取消消息(延遲15分鐘)
*/
public void sendOrderTimeoutMessage(String orderId) {
String message = "訂單超時(shí)取消: " + orderId;
// 15分鐘對(duì)應(yīng)level=14(根據(jù)默認(rèn)配置)
sendMessageByDelayLevel("OrderTopic", "Timeout", message, 14);
}
}
四、基于具體時(shí)間的延遲消息實(shí)現(xiàn)
1. 定時(shí)消息生產(chǎn)者
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class ScheduledMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 發(fā)送延遲指定毫秒數(shù)的消息
*/
public void sendMessageWithDelayMs(String topic, String message, long delayMs) {
// 計(jì)算投遞時(shí)間
long deliverTimeMs = System.currentTimeMillis() + delayMs;
// 創(chuàng)建消息并設(shè)置投遞時(shí)間
Message<String> springMessage = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_MS, String.valueOf(delayMs))
.setHeader(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(deliverTimeMs))
.build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, springMessage);
System.out.println("延遲毫秒消息發(fā)送成功: " + sendResult);
}
/**
* 發(fā)送指定時(shí)間點(diǎn)投遞的消息
*/
public void sendMessageAtTime(String topic, String message, Date deliverTime) {
long deliverTimeMs = deliverTime.getTime();
// 創(chuàng)建消息并設(shè)置投遞時(shí)間
Message<String> springMessage = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_TIMER_DELIVER_MS, String.valueOf(deliverTimeMs))
.build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, springMessage);
System.out.println("定時(shí)投遞消息發(fā)送成功: " + sendResult);
}
/**
* 發(fā)送10秒后投遞的消息
*/
public void sendTenSecondsLaterMessage(String topic, String message) {
sendMessageWithDelayMs(topic, message, 10000L);
}
}
五、消費(fèi)者實(shí)現(xiàn)
延遲消息的消費(fèi)者與普通消息消費(fèi)者相同,無(wú)需特殊配置:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Component
@RocketMQMessageListener(
topic = "OrderTopic",
consumerGroup = "delay-message-consumer-group",
selectorExpression = "Timeout"
)
public class OrderTimeoutConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
System.out.println("[" + now + "] 接收到訂單超時(shí)消息: " + message);
// 處理訂單取消邏輯
processOrderCancellation(message);
}
private void processOrderCancellation(String message) {
// 提取訂單ID
String orderId = message.substring(message.indexOf(":") + 2);
System.out.println("執(zhí)行訂單取消操作,訂單ID: " + orderId);
// 這里可以調(diào)用訂單服務(wù)進(jìn)行取消操作
}
}
六、Controller層實(shí)現(xiàn)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
@RestController
@RequestMapping("/api/delay")
public class DelayMessageController {
@Autowired
private DelayLevelProducer delayLevelProducer;
@Autowired
private ScheduledMessageProducer scheduledMessageProducer;
/**
* 發(fā)送基于延遲級(jí)別的消息
*/
@PostMapping("/level")
public String sendByDelayLevel(
@RequestParam String topic,
@RequestParam String tag,
@RequestParam String message,
@RequestParam(defaultValue = "3") int delayLevel) {
delayLevelProducer.sendMessageByDelayLevel(topic, tag, message, delayLevel);
return "延遲級(jí)別消息發(fā)送成功,延遲級(jí)別: " + delayLevel;
}
/**
* 發(fā)送訂單超時(shí)取消消息
*/
@PostMapping("/order/timeout")
public String sendOrderTimeout(@RequestParam String orderId) {
delayLevelProducer.sendOrderTimeoutMessage(orderId);
return "訂單超時(shí)取消消息已發(fā)送,訂單ID: " + orderId;
}
/**
* 發(fā)送延遲指定毫秒的消息
*/
@PostMapping("/milliseconds")
public String sendByDelayMs(
@RequestParam String topic,
@RequestParam String message,
@RequestParam long delayMs) {
scheduledMessageProducer.sendMessageWithDelayMs(topic, message, delayMs);
return "延遲毫秒消息發(fā)送成功,延遲: " + delayMs + "ms";
}
/**
* 發(fā)送指定時(shí)間點(diǎn)的消息
*/
@PostMapping("/scheduled")
public String sendScheduled(
@RequestParam String topic,
@RequestParam String message,
@RequestParam @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Date deliverTime) {
scheduledMessageProducer.sendMessageAtTime(topic, message, deliverTime);
return "定時(shí)消息發(fā)送成功,投遞時(shí)間: " + deliverTime;
}
}
七、自定義延遲級(jí)別配置
在Broker的配置文件中可以自定義延遲級(jí)別:
# 在broker.conf文件中添加 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 3h 4h 5h
重啟Broker使其生效。注意,修改延遲級(jí)別后,所有使用延遲級(jí)別的消息都會(huì)使用新的配置。
八、兩種實(shí)現(xiàn)方式對(duì)比
| 特性 | 基于延遲級(jí)別 | 基于具體時(shí)間 |
|---|---|---|
| 靈活性 | 較低,只能使用預(yù)定義級(jí)別 | 高,可以精確到毫秒 |
| 適用版本 | 全版本支持 | RocketMQ 5.x及以上版本完整支持 |
| 使用場(chǎng)景 | 固定延遲時(shí)間的場(chǎng)景 | 需要精確控制投遞時(shí)間的場(chǎng)景 |
| 配置復(fù)雜度 | 簡(jiǎn)單,無(wú)需額外配置 | 可能需要在Broker端開啟相關(guān)功能 |
九、使用注意事項(xiàng)
延遲精度:
- 延遲消息的投遞時(shí)間不是完全精確的,有一定誤差
- 在高并發(fā)場(chǎng)景下,誤差可能會(huì)增大
版本兼容性:
- 基于具體時(shí)間的延遲消息在RocketMQ 5.x版本支持更完善
- 在低版本中可能需要使用延遲級(jí)別機(jī)制
性能考慮:
- 大量延遲消息可能會(huì)增加Broker的負(fù)擔(dān)
- 對(duì)于長(zhǎng)時(shí)間延遲的消息,考慮使用其他方案(如定時(shí)任務(wù)+消息隊(duì)列組合)
消息可靠性:
- 延遲消息同樣支持持久化,確保Broker重啟后不會(huì)丟失
- 建議開啟消息確認(rèn)機(jī)制確保消息可靠投遞
十、測(cè)試示例
發(fā)送訂單超時(shí)取消消息(延遲15分鐘):
POST /api/delay/order/timeout?orderId=ORDER123456
發(fā)送10秒后投遞的消息:
POST /api/delay/milliseconds?topic=TestTopic&message=HelloDelay&delayMs=10000
發(fā)送指定時(shí)間點(diǎn)的消息:
POST /api/delay/scheduled?topic=TestTopic&message=HelloScheduled&deliverTime=2024-12-25%2000:00:00
通過以上配置和代碼,您可以在SpringBoot項(xiàng)目中輕松實(shí)現(xiàn)基于RocketMQ的延遲消息功能,滿足各種定時(shí)任務(wù)和延遲處理的業(yè)務(wù)需求。
到此這篇關(guān)于SpringBoot+RocketMQ實(shí)現(xiàn)延遲消息的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot RocketMQ 延遲內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 淺談Springboot整合RocketMQ使用心得
- springBoot整合RocketMQ及坑的示例代碼
- SpringBoot整合RocketMQ實(shí)現(xiàn)消息發(fā)送和接收的詳細(xì)步驟
- Springboot RocketMq實(shí)現(xiàn)過程詳解
- 解決SpringBoot整合RocketMQ遇到的坑
- SpringBoot整合RocketMQ實(shí)現(xiàn)發(fā)送同步消息
- Springboot詳解RocketMQ實(shí)現(xiàn)消息發(fā)送與接收流程
- SpringBoot集成RocketMQ的使用示例
- SpringBoot項(xiàng)目嵌入RocketMQ的實(shí)現(xiàn)示例
相關(guān)文章
關(guān)于Java?中?Future?的?get?方法超時(shí)問題
這篇文章主要介紹了Java?中?Future?的?get?方法超時(shí),最常見的理解就是,“超時(shí)以后,當(dāng)前線程繼續(xù)執(zhí)行,線程池里的對(duì)應(yīng)線程中斷”,真的是這樣嗎?本文給大家詳細(xì)介紹,需要的朋友參考下吧2022-06-06
RabbitMQ工作模式中的發(fā)布確認(rèn)模式示例詳解
發(fā)布確認(rèn)模式用于確保消息已經(jīng)被正確地發(fā)送到RabbitMQ服務(wù)器,并被成功接收和持久化,本文通過實(shí)例代碼給大家介紹RabbitMQ工作模式之發(fā)布確認(rèn)模式,感興趣的朋友一起看看吧2025-05-05
定時(shí)任務(wù)@Scheduled用法及其參數(shù)使用
這篇文章主要介紹了定時(shí)任務(wù)@Scheduled用法及其參數(shù)使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08
java實(shí)現(xiàn)計(jì)算器加法小程序(圖形化界面)
這篇文章主要介紹了Java實(shí)現(xiàn)圖形化界面的計(jì)算器加法小程序,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-05-05
MyBatis-Plus 使用枚舉自動(dòng)關(guān)聯(lián)注入
本文主要介紹了MyBatis-Plus 使用枚舉自動(dòng)關(guān)聯(lián)注入,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-06-06

