Redis處理MQ消費冪等的實現(xiàn)示例
本方案參考馬哥短連接項目中的消息冪等處理方案
一.生成者模版代碼
@Slf4j @Component @RequiredArgsConstructor @ConditionalOnProperty(name = "message-queue.type", havingValue = "rocketmq") public class CustomMessageProducer implements MessageQueueProducer { private final RocketMQTemplate rocketMQTemplate; @Value("${rocketmq.producer.topic}") private String customTopic; /** * 通用的發(fā)送方法,允許自定義消息內(nèi)容和處理邏輯 * @param messagePayload 消息體數(shù)據(jù) */ @Override public void send(Map<String, String> messagePayload) { // 生成唯一消息鍵,用于標識消息的冪等性 String messageId = UUID.randomUUID().toString(); messagePayload.put("messageId", messageId); // 構(gòu)建消息 Message<Map<String, String>> message = MessageBuilder .withPayload(messagePayload) .setHeader(MessageConst.PROPERTY_KEYS, messageId) .build(); // 發(fā)送消息并處理結(jié)果 try { SendResult sendResult = rocketMQTemplate.syncSend(customTopic, message, 2000L); log.info("消息發(fā)送成功: 狀態(tài)={}, 消息ID={}, 消息鍵={}", sendResult.getSendStatus(), sendResult.getMsgId(), messageId); } catch (Exception e) { log.error("消息發(fā)送失敗: 消息內(nèi)容={}", JSON.toJSONString(messagePayload), e); // 添加自定義的失敗處理邏輯 } } }
關(guān)鍵說明
- 冪等性標識:在
messagePayload
中加入messageId
,確保消息唯一性,便于后續(xù)在 Redis 中驗證和處理消息冪等。 - 自定義邏輯:為不同的業(yè)務(wù)需求,調(diào)整
customTopic
及messagePayload
內(nèi)容。 - 異常處理:保留自定義的異常處理接口,以便對失敗消息進行處理或重試。
這樣設(shè)計有助于實現(xiàn)通用的消息發(fā)送,只需更改 messagePayload
數(shù)據(jù)結(jié)構(gòu)和自定義處理邏輯即可適應(yīng)不同業(yè)務(wù)。
二.消息冪等處理器模版代碼
package com.example.project.mq.idempotent; import lombok.RequiredArgsConstructor; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.util.Objects; import java.util.concurrent.TimeUnit; /** * 消息冪等處理器 */ @Component @RequiredArgsConstructor public class MessageIdempotentHandler { private final StringRedisTemplate stringRedisTemplate; private static final String IDEMPOTENT_KEY_PREFIX = "message:idempotent:"; /** * 判斷消息是否已被處理 * * @param messageId 消息唯一標識 * @return true 表示消息未處理,可以繼續(xù)處理;false 表示消息已處理,避免重復(fù)消費 */ public boolean isMessageNotProcessed(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; // 嘗試設(shè)置新鍵,如果不存在則返回 true 表示未處理,存在則返回 false return Boolean.TRUE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, "0", 2, TimeUnit.MINUTES)); } /** * 標記消息處理流程完成 * * @param messageId 消息唯一標識 */ public void markAsProcessed(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; stringRedisTemplate.opsForValue().set(key, "1", 2, TimeUnit.MINUTES); } /** * 查詢消息是否已經(jīng)處理完成 * * @param messageId 消息唯一標識 * @return true 表示消息處理已完成,false 表示未完成 */ public boolean isProcessingComplete(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; return Objects.equals(stringRedisTemplate.opsForValue().get(key), "1"); } /** * 處理異常時刪除冪等標識 * * @param messageId 消息唯一標識 */ public void clearProcessedFlag(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; stringRedisTemplate.delete(key); } }
職責(zé)描述
整體職責(zé):
MessageIdempotentHandler
主要職責(zé)是保證消息在消費過程中只被處理一次,防止重復(fù)消費。它借助 Redis 存儲和檢查消息的唯一標識符,以實現(xiàn)消息的冪等性控制。方法職責(zé):
isMessageNotProcessed
:判斷消息是否已處理。此方法嘗試設(shè)置一個短期過期的標識,如果消息尚未被消費(即 Redis 中不存在該鍵),則可以繼續(xù)處理,否則表示消息已處理。markAsProcessed
:標記消息為已完成消費。成功消費后調(diào)用該方法,改變 Redis 中的鍵值標識,標記為已完成狀態(tài),避免重復(fù)處理。isProcessingComplete
:檢查消息消費流程是否完成。消費完成后,該鍵值會被設(shè)置為"1"
,此方法用于驗證該狀態(tài)。clearProcessedFlag
:清除冪等標識。在消息消費失敗或異常情況下,刪除標識以便消息可重新消費。
三.生產(chǎn)者代碼模版
@Override public void onMessage(Map<String, String> producerMap) { // 獲取消息的唯一標識符 String keys = producerMap.get("keys"); // 檢查是否已處理過該消息,冪等性控制 if (!messageQueueIdempotentHandler.isMessageProcessed(keys)) { // 若該消息流程尚未完成 if (messageQueueIdempotentHandler.isAccomplish(keys)) { return; // 跳過已完成流程的消息 } throw new ServiceException("消息未完成流程,需要消息隊列重試"); } // 業(yè)務(wù)邏輯處理 try { //調(diào)用業(yè)務(wù)方法代碼 } } catch (Throwable ex) { log.error("消費異常", ex); throw ex; } // 標記消息處理完成 messageQueueIdempotentHandler.setAccomplish(keys); }
職責(zé)描述
onMessage
模板的職責(zé)是接收并處理消息隊列中的消息,確保冪等性,并在需要時拋出異常讓消息隊列進行重試。以下是該方法中關(guān)鍵步驟的職責(zé)和操作說明:
- 消息冪等性校驗:
- 通過
keys
作為唯一標識符來檢查消息是否已被處理過,避免重復(fù)消費。 - 使用
messageQueueIdempotentHandler.isMessageProcessed(keys)
方法,如果消息已處理,則跳過;如果未完成處理流程,但狀態(tài)為“已完成”,則直接返回;否則拋出ServiceException
,讓消息隊列進行重試。
- 通過
- 業(yè)務(wù)邏輯處理:
- 在
try
塊中進行消息的實際業(yè)務(wù)處理,調(diào)用相關(guān)業(yè)務(wù)邏輯方法,避免未捕獲的異常導(dǎo)致冪等性標記不一致。 - 如果業(yè)務(wù)處理過程中拋出異常,記錄異常日志 (
log.error("消費異常", ex)
) 并再次拋出異常。
- 在
- 消息處理完成標記:
- 在成功完成業(yè)務(wù)邏輯后,調(diào)用
messageQueueIdempotentHandler.setAccomplish(keys)
將消息標記為已完成。 - 這樣可以確保消息的狀態(tài)被正確更新,即使在重試后也能避免重復(fù)消費,保證消息的冪等性。
- 在成功完成業(yè)務(wù)邏輯后,調(diào)用
四.消息消費流程
假設(shè)消息消費應(yīng)用場景如下:
- 接收消息:消息隊列接收到新消息,獲取消息的唯一標識
messageId
。 - 檢查是否已處理:
- 調(diào)用
isMessageNotProcessed
檢查 Redis 中是否有該消息的標識。 - 若返回
true
,則說明此消息未被處理,進入消費流程。 - 若返回
false
,則說明此消息已處理,直接跳過避免重復(fù)消費。
- 調(diào)用
- 消費消息:若消息未處理,則執(zhí)行具體的業(yè)務(wù)邏輯。
- 標記完成狀態(tài):
- 消費完成后,調(diào)用
markAsProcessed
,在 Redis 中將該消息標記為已完成。
- 消費完成后,調(diào)用
- 異常處理:
- 若消費過程中拋出異常,調(diào)用
clearProcessedFlag
刪除該消息的 Redis 標識,允許系統(tǒng)稍后重新嘗試處理該消息。
- 若消費過程中拋出異常,調(diào)用
五.總結(jié)
在本方案中,通過使用 Redis 實現(xiàn) MQ 消息的冪等處理,確保了消息在消費過程中只會被處理一次,避免了重復(fù)消費帶來的業(yè)務(wù)異常和資源浪費。其主要特點和優(yōu)勢如下:
- 冪等性保證:使用 Redis 的
setIfAbsent
來判斷消息是否已被處理,確保消息在消費過程中僅被執(zhí)行一次,避免重復(fù)消費的風(fēng)險。 - 通用性設(shè)計:
- 生產(chǎn)者代碼采用通用模板,通過
UUID
生成消息的唯一標識messageId
,并將其嵌入到消息體中,保證每條消息的唯一性。 - 消費者代碼采用冪等處理器模板
MessageIdempotentHandler
,支持多種狀態(tài)檢查和異常處理。
- 生產(chǎn)者代碼采用通用模板,通過
- 細化的異常處理:在消費過程中,若發(fā)生異常,可以及時刪除 Redis 中的標識,確保系統(tǒng)在下次重新消費該消息時不會被誤認為已處理,增強了消息消費的健壯性。
- 靈活的業(yè)務(wù)集成:該方案可以根據(jù)不同業(yè)務(wù)需求調(diào)整消息內(nèi)容和自定義的業(yè)務(wù)邏輯處理,適應(yīng)多種場景下的消息冪等消費需求。
- 流程化管理:通過
markAsProcessed
和clearProcessedFlag
實現(xiàn)了消費完成狀態(tài)標記和異常重試機制,確保消費的可靠性和一致性。
總的來說,此方案為消息冪等性控制提供了一種可擴展、通用且高效的實現(xiàn)方式,非常適合在高并發(fā)分布式系統(tǒng)中應(yīng)用,能夠有效提高消息消費的穩(wěn)定性和安全性
到此這篇關(guān)于Redis處理MQ消費冪等的實現(xiàn)示例的文章就介紹到這了,更多相關(guān)Redis MQ消費冪等內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis內(nèi)部數(shù)據(jù)結(jié)構(gòu)Dict的實現(xiàn)方法
這篇文章主要介紹了Redis內(nèi)部數(shù)據(jù)結(jié)構(gòu)Dict的實現(xiàn)方法,本篇文章所述的dict在Redis中最主要的作用就是用于維護Redis數(shù)據(jù)庫中所有Key、value映射的數(shù)據(jù)結(jié)構(gòu),需要的朋友可以參考下2022-05-05Redis?存儲對象信息用?Hash?和String的區(qū)別
這篇文章主要介紹了Redis存儲對象信息用Hash和String的區(qū)別,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-09-09使用Redis防止重復(fù)發(fā)送RabbitMQ消息的方法詳解
今天遇到一個問題,發(fā)送MQ消息的時候需要保證不會重復(fù)發(fā)送,注意不是可靠到達,這里保證的是不會生產(chǎn)多條一樣的消息,所以本文主要介紹了使用Redis防止重復(fù)發(fā)送RabbitMQ消息的方法,需要的朋友可以參考下2025-01-01Redis內(nèi)存碎片產(chǎn)生原因及Pipeline管道原理解析
這篇文章主要為大家介紹了Redis內(nèi)存碎片產(chǎn)生原因及Pipeline管道原理解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-03-03