Redis處理MQ消費(fèi)冪等的實(shí)現(xiàn)示例
本方案參考馬哥短連接項(xiàng)目中的消息冪等處理方案
一.生成者模版代碼
@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name = "message-queue.type", havingValue = "rocketmq")
public class CustomMessageProducer implements MessageQueueProducer {
private final RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.producer.topic}")
private String customTopic;
/**
* 通用的發(fā)送方法,允許自定義消息內(nèi)容和處理邏輯
* @param messagePayload 消息體數(shù)據(jù)
*/
@Override
public void send(Map<String, String> messagePayload) {
// 生成唯一消息鍵,用于標(biāo)識(shí)消息的冪等性
String messageId = UUID.randomUUID().toString();
messagePayload.put("messageId", messageId);
// 構(gòu)建消息
Message<Map<String, String>> message = MessageBuilder
.withPayload(messagePayload)
.setHeader(MessageConst.PROPERTY_KEYS, messageId)
.build();
// 發(fā)送消息并處理結(jié)果
try {
SendResult sendResult = rocketMQTemplate.syncSend(customTopic, message, 2000L);
log.info("消息發(fā)送成功: 狀態(tài)={}, 消息ID={}, 消息鍵={}", sendResult.getSendStatus(), sendResult.getMsgId(), messageId);
} catch (Exception e) {
log.error("消息發(fā)送失敗: 消息內(nèi)容={}", JSON.toJSONString(messagePayload), e);
// 添加自定義的失敗處理邏輯
}
}
}
關(guān)鍵說明
- 冪等性標(biāo)識(shí):在
messagePayload中加入messageId,確保消息唯一性,便于后續(xù)在 Redis 中驗(yàn)證和處理消息冪等。 - 自定義邏輯:為不同的業(yè)務(wù)需求,調(diào)整
customTopic及messagePayload內(nèi)容。 - 異常處理:保留自定義的異常處理接口,以便對(duì)失敗消息進(jìn)行處理或重試。
這樣設(shè)計(jì)有助于實(shí)現(xiàn)通用的消息發(fā)送,只需更改 messagePayload 數(shù)據(jù)結(jié)構(gòu)和自定義處理邏輯即可適應(yīng)不同業(yè)務(wù)。
二.消息冪等處理器模版代碼
package com.example.project.mq.idempotent;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 消息冪等處理器
*/
@Component
@RequiredArgsConstructor
public class MessageIdempotentHandler {
private final StringRedisTemplate stringRedisTemplate;
private static final String IDEMPOTENT_KEY_PREFIX = "message:idempotent:";
/**
* 判斷消息是否已被處理
*
* @param messageId 消息唯一標(biāo)識(shí)
* @return true 表示消息未處理,可以繼續(xù)處理;false 表示消息已處理,避免重復(fù)消費(fèi)
*/
public boolean isMessageNotProcessed(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
// 嘗試設(shè)置新鍵,如果不存在則返回 true 表示未處理,存在則返回 false
return Boolean.TRUE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, "0", 2, TimeUnit.MINUTES));
}
/**
* 標(biāo)記消息處理流程完成
*
* @param messageId 消息唯一標(biāo)識(shí)
*/
public void markAsProcessed(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
stringRedisTemplate.opsForValue().set(key, "1", 2, TimeUnit.MINUTES);
}
/**
* 查詢消息是否已經(jīng)處理完成
*
* @param messageId 消息唯一標(biāo)識(shí)
* @return true 表示消息處理已完成,false 表示未完成
*/
public boolean isProcessingComplete(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
return Objects.equals(stringRedisTemplate.opsForValue().get(key), "1");
}
/**
* 處理異常時(shí)刪除冪等標(biāo)識(shí)
*
* @param messageId 消息唯一標(biāo)識(shí)
*/
public void clearProcessedFlag(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
stringRedisTemplate.delete(key);
}
}
職責(zé)描述
整體職責(zé):
MessageIdempotentHandler主要職責(zé)是保證消息在消費(fèi)過程中只被處理一次,防止重復(fù)消費(fèi)。它借助 Redis 存儲(chǔ)和檢查消息的唯一標(biāo)識(shí)符,以實(shí)現(xiàn)消息的冪等性控制。方法職責(zé):
isMessageNotProcessed:判斷消息是否已處理。此方法嘗試設(shè)置一個(gè)短期過期的標(biāo)識(shí),如果消息尚未被消費(fèi)(即 Redis 中不存在該鍵),則可以繼續(xù)處理,否則表示消息已處理。markAsProcessed:標(biāo)記消息為已完成消費(fèi)。成功消費(fèi)后調(diào)用該方法,改變 Redis 中的鍵值標(biāo)識(shí),標(biāo)記為已完成狀態(tài),避免重復(fù)處理。isProcessingComplete:檢查消息消費(fèi)流程是否完成。消費(fèi)完成后,該鍵值會(huì)被設(shè)置為"1",此方法用于驗(yàn)證該狀態(tài)。clearProcessedFlag:清除冪等標(biāo)識(shí)。在消息消費(fèi)失敗或異常情況下,刪除標(biāo)識(shí)以便消息可重新消費(fèi)。
三.生產(chǎn)者代碼模版
@Override
public void onMessage(Map<String, String> producerMap) {
// 獲取消息的唯一標(biāo)識(shí)符
String keys = producerMap.get("keys");
// 檢查是否已處理過該消息,冪等性控制
if (!messageQueueIdempotentHandler.isMessageProcessed(keys)) {
// 若該消息流程尚未完成
if (messageQueueIdempotentHandler.isAccomplish(keys)) {
return; // 跳過已完成流程的消息
}
throw new ServiceException("消息未完成流程,需要消息隊(duì)列重試");
}
// 業(yè)務(wù)邏輯處理
try {
//調(diào)用業(yè)務(wù)方法代碼
}
} catch (Throwable ex) {
log.error("消費(fèi)異常", ex);
throw ex;
}
// 標(biāo)記消息處理完成
messageQueueIdempotentHandler.setAccomplish(keys);
}
職責(zé)描述
onMessage 模板的職責(zé)是接收并處理消息隊(duì)列中的消息,確保冪等性,并在需要時(shí)拋出異常讓消息隊(duì)列進(jìn)行重試。以下是該方法中關(guān)鍵步驟的職責(zé)和操作說明:
- 消息冪等性校驗(yàn):
- 通過
keys作為唯一標(biāo)識(shí)符來檢查消息是否已被處理過,避免重復(fù)消費(fèi)。 - 使用
messageQueueIdempotentHandler.isMessageProcessed(keys)方法,如果消息已處理,則跳過;如果未完成處理流程,但狀態(tài)為“已完成”,則直接返回;否則拋出ServiceException,讓消息隊(duì)列進(jìn)行重試。
- 通過
- 業(yè)務(wù)邏輯處理:
- 在
try塊中進(jìn)行消息的實(shí)際業(yè)務(wù)處理,調(diào)用相關(guān)業(yè)務(wù)邏輯方法,避免未捕獲的異常導(dǎo)致冪等性標(biāo)記不一致。 - 如果業(yè)務(wù)處理過程中拋出異常,記錄異常日志 (
log.error("消費(fèi)異常", ex)) 并再次拋出異常。
- 在
- 消息處理完成標(biāo)記:
- 在成功完成業(yè)務(wù)邏輯后,調(diào)用
messageQueueIdempotentHandler.setAccomplish(keys)將消息標(biāo)記為已完成。 - 這樣可以確保消息的狀態(tài)被正確更新,即使在重試后也能避免重復(fù)消費(fèi),保證消息的冪等性。
- 在成功完成業(yè)務(wù)邏輯后,調(diào)用
四.消息消費(fèi)流程
假設(shè)消息消費(fèi)應(yīng)用場(chǎng)景如下:
- 接收消息:消息隊(duì)列接收到新消息,獲取消息的唯一標(biāo)識(shí)
messageId。 - 檢查是否已處理:
- 調(diào)用
isMessageNotProcessed檢查 Redis 中是否有該消息的標(biāo)識(shí)。 - 若返回
true,則說明此消息未被處理,進(jìn)入消費(fèi)流程。 - 若返回
false,則說明此消息已處理,直接跳過避免重復(fù)消費(fèi)。
- 調(diào)用
- 消費(fèi)消息:若消息未處理,則執(zhí)行具體的業(yè)務(wù)邏輯。
- 標(biāo)記完成狀態(tài):
- 消費(fèi)完成后,調(diào)用
markAsProcessed,在 Redis 中將該消息標(biāo)記為已完成。
- 消費(fèi)完成后,調(diào)用
- 異常處理:
- 若消費(fèi)過程中拋出異常,調(diào)用
clearProcessedFlag刪除該消息的 Redis 標(biāo)識(shí),允許系統(tǒng)稍后重新嘗試處理該消息。
- 若消費(fèi)過程中拋出異常,調(diào)用
五.總結(jié)
在本方案中,通過使用 Redis 實(shí)現(xiàn) MQ 消息的冪等處理,確保了消息在消費(fèi)過程中只會(huì)被處理一次,避免了重復(fù)消費(fèi)帶來的業(yè)務(wù)異常和資源浪費(fèi)。其主要特點(diǎn)和優(yōu)勢(shì)如下:
- 冪等性保證:使用 Redis 的
setIfAbsent來判斷消息是否已被處理,確保消息在消費(fèi)過程中僅被執(zhí)行一次,避免重復(fù)消費(fèi)的風(fēng)險(xiǎn)。 - 通用性設(shè)計(jì):
- 生產(chǎn)者代碼采用通用模板,通過
UUID生成消息的唯一標(biāo)識(shí)messageId,并將其嵌入到消息體中,保證每條消息的唯一性。 - 消費(fèi)者代碼采用冪等處理器模板
MessageIdempotentHandler,支持多種狀態(tài)檢查和異常處理。
- 生產(chǎn)者代碼采用通用模板,通過
- 細(xì)化的異常處理:在消費(fèi)過程中,若發(fā)生異常,可以及時(shí)刪除 Redis 中的標(biāo)識(shí),確保系統(tǒng)在下次重新消費(fèi)該消息時(shí)不會(huì)被誤認(rèn)為已處理,增強(qiáng)了消息消費(fèi)的健壯性。
- 靈活的業(yè)務(wù)集成:該方案可以根據(jù)不同業(yè)務(wù)需求調(diào)整消息內(nèi)容和自定義的業(yè)務(wù)邏輯處理,適應(yīng)多種場(chǎng)景下的消息冪等消費(fèi)需求。
- 流程化管理:通過
markAsProcessed和clearProcessedFlag實(shí)現(xiàn)了消費(fèi)完成狀態(tài)標(biāo)記和異常重試機(jī)制,確保消費(fèi)的可靠性和一致性。
總的來說,此方案為消息冪等性控制提供了一種可擴(kuò)展、通用且高效的實(shí)現(xiàn)方式,非常適合在高并發(fā)分布式系統(tǒng)中應(yīng)用,能夠有效提高消息消費(fèi)的穩(wěn)定性和安全性
到此這篇關(guān)于Redis處理MQ消費(fèi)冪等的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)Redis MQ消費(fèi)冪等內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis在windows環(huán)境下如何啟動(dòng)
這篇文章主要介紹了Redis在windows環(huán)境下如何啟動(dòng)的實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2025-04-04
jwt+redis實(shí)現(xiàn)登錄認(rèn)證的示例代碼
在登錄業(yè)務(wù)代碼中,當(dāng)用戶登錄成功時(shí),生成一個(gè)登錄憑證存儲(chǔ)到redis中,本文主要介紹了jwt+redis實(shí)現(xiàn)登錄認(rèn)證的示例代碼,具有一定的參考價(jià)值,感興趣的可以了解一下2024-03-03
Redis內(nèi)部數(shù)據(jù)結(jié)構(gòu)Dict的實(shí)現(xiàn)方法
這篇文章主要介紹了Redis內(nèi)部數(shù)據(jù)結(jié)構(gòu)Dict的實(shí)現(xiàn)方法,本篇文章所述的dict在Redis中最主要的作用就是用于維護(hù)Redis數(shù)據(jù)庫(kù)中所有Key、value映射的數(shù)據(jù)結(jié)構(gòu),需要的朋友可以參考下2022-05-05
Redis?存儲(chǔ)對(duì)象信息用?Hash?和String的區(qū)別
這篇文章主要介紹了Redis存儲(chǔ)對(duì)象信息用Hash和String的區(qū)別,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-09-09
使用Redis防止重復(fù)發(fā)送RabbitMQ消息的方法詳解
今天遇到一個(gè)問題,發(fā)送MQ消息的時(shí)候需要保證不會(huì)重復(fù)發(fā)送,注意不是可靠到達(dá),這里保證的是不會(huì)生產(chǎn)多條一樣的消息,所以本文主要介紹了使用Redis防止重復(fù)發(fā)送RabbitMQ消息的方法,需要的朋友可以參考下2025-01-01
Redis內(nèi)存碎片產(chǎn)生原因及Pipeline管道原理解析
這篇文章主要為大家介紹了Redis內(nèi)存碎片產(chǎn)生原因及Pipeline管道原理解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03

