欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redis處理MQ消費冪等的實現(xiàn)示例

 更新時間:2025年05月16日 10:40:51   作者:sjsjsbbsbsn  
本文主要介紹了Redis處理MQ消費冪等的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

本方案參考馬哥短連接項目中的消息冪等處理方案

一.生成者模版代碼

@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name = "message-queue.type", havingValue = "rocketmq")
public class CustomMessageProducer implements MessageQueueProducer {
    
    private final RocketMQTemplate rocketMQTemplate;

    @Value("${rocketmq.producer.topic}")
    private String customTopic;

    /**
     * 通用的發(fā)送方法,允許自定義消息內(nèi)容和處理邏輯
     * @param messagePayload 消息體數(shù)據(jù)
     */
    @Override
    public void send(Map<String, String> messagePayload) {
        // 生成唯一消息鍵,用于標識消息的冪等性
        String messageId = UUID.randomUUID().toString();
        messagePayload.put("messageId", messageId);

        // 構(gòu)建消息
        Message<Map<String, String>> message = MessageBuilder
                .withPayload(messagePayload)
                .setHeader(MessageConst.PROPERTY_KEYS, messageId)
                .build();

        // 發(fā)送消息并處理結(jié)果
        try {
            SendResult sendResult = rocketMQTemplate.syncSend(customTopic, message, 2000L);
            log.info("消息發(fā)送成功: 狀態(tài)={}, 消息ID={}, 消息鍵={}", sendResult.getSendStatus(), sendResult.getMsgId(), messageId);
        } catch (Exception e) {
            log.error("消息發(fā)送失敗: 消息內(nèi)容={}", JSON.toJSONString(messagePayload), e);
            // 添加自定義的失敗處理邏輯
        }
    }
}

關(guān)鍵說明

  • 冪等性標識:在 messagePayload 中加入 messageId,確保消息唯一性,便于后續(xù)在 Redis 中驗證和處理消息冪等。
  • 自定義邏輯:為不同的業(yè)務(wù)需求,調(diào)整 customTopic 及 messagePayload 內(nèi)容。
  • 異常處理:保留自定義的異常處理接口,以便對失敗消息進行處理或重試。

這樣設(shè)計有助于實現(xiàn)通用的消息發(fā)送,只需更改 messagePayload 數(shù)據(jù)結(jié)構(gòu)和自定義處理邏輯即可適應(yīng)不同業(yè)務(wù)。

二.消息冪等處理器模版代碼

package com.example.project.mq.idempotent;

import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * 消息冪等處理器
 */
@Component
@RequiredArgsConstructor
public class MessageIdempotentHandler {

    private final StringRedisTemplate stringRedisTemplate;
    private static final String IDEMPOTENT_KEY_PREFIX = "message:idempotent:";

    /**
     * 判斷消息是否已被處理
     *
     * @param messageId 消息唯一標識
     * @return true 表示消息未處理,可以繼續(xù)處理;false 表示消息已處理,避免重復(fù)消費
     */
    public boolean isMessageNotProcessed(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        // 嘗試設(shè)置新鍵,如果不存在則返回 true 表示未處理,存在則返回 false
        return Boolean.TRUE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, "0", 2, TimeUnit.MINUTES));
    }

    /**
     * 標記消息處理流程完成
     *
     * @param messageId 消息唯一標識
     */
    public void markAsProcessed(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        stringRedisTemplate.opsForValue().set(key, "1", 2, TimeUnit.MINUTES);
    }

    /**
     * 查詢消息是否已經(jīng)處理完成
     *
     * @param messageId 消息唯一標識
     * @return true 表示消息處理已完成,false 表示未完成
     */
    public boolean isProcessingComplete(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        return Objects.equals(stringRedisTemplate.opsForValue().get(key), "1");
    }

    /**
     * 處理異常時刪除冪等標識
     *
     * @param messageId 消息唯一標識
     */
    public void clearProcessedFlag(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        stringRedisTemplate.delete(key);
    }
}

職責(zé)描述

  • 整體職責(zé)
    MessageIdempotentHandler 主要職責(zé)是保證消息在消費過程中只被處理一次,防止重復(fù)消費。它借助 Redis 存儲和檢查消息的唯一標識符,以實現(xiàn)消息的冪等性控制。

  • 方法職責(zé)

    • isMessageNotProcessed:判斷消息是否已處理。此方法嘗試設(shè)置一個短期過期的標識,如果消息尚未被消費(即 Redis 中不存在該鍵),則可以繼續(xù)處理,否則表示消息已處理。
    • markAsProcessed:標記消息為已完成消費。成功消費后調(diào)用該方法,改變 Redis 中的鍵值標識,標記為已完成狀態(tài),避免重復(fù)處理。
    • isProcessingComplete:檢查消息消費流程是否完成。消費完成后,該鍵值會被設(shè)置為 "1",此方法用于驗證該狀態(tài)。
    • clearProcessedFlag:清除冪等標識。在消息消費失敗或異常情況下,刪除標識以便消息可重新消費。

三.生產(chǎn)者代碼模版

@Override
public void onMessage(Map<String, String> producerMap) {
    // 獲取消息的唯一標識符
    String keys = producerMap.get("keys");
    
    // 檢查是否已處理過該消息,冪等性控制
    if (!messageQueueIdempotentHandler.isMessageProcessed(keys)) {
        // 若該消息流程尚未完成
        if (messageQueueIdempotentHandler.isAccomplish(keys)) {
            return; // 跳過已完成流程的消息
        }
        throw new ServiceException("消息未完成流程,需要消息隊列重試");
    }
    
    // 業(yè)務(wù)邏輯處理
    try {
       	//調(diào)用業(yè)務(wù)方法代碼
        }
    } catch (Throwable ex) {
        log.error("消費異常", ex);
        throw ex;
    }
    
    // 標記消息處理完成
    messageQueueIdempotentHandler.setAccomplish(keys);
}


職責(zé)描述

onMessage 模板的職責(zé)是接收并處理消息隊列中的消息,確保冪等性,并在需要時拋出異常讓消息隊列進行重試。以下是該方法中關(guān)鍵步驟的職責(zé)和操作說明:

  • 消息冪等性校驗
    • 通過 keys 作為唯一標識符來檢查消息是否已被處理過,避免重復(fù)消費。
    • 使用 messageQueueIdempotentHandler.isMessageProcessed(keys) 方法,如果消息已處理,則跳過;如果未完成處理流程,但狀態(tài)為“已完成”,則直接返回;否則拋出 ServiceException,讓消息隊列進行重試。
  • 業(yè)務(wù)邏輯處理
    • 在 try 塊中進行消息的實際業(yè)務(wù)處理,調(diào)用相關(guān)業(yè)務(wù)邏輯方法,避免未捕獲的異常導(dǎo)致冪等性標記不一致。
    • 如果業(yè)務(wù)處理過程中拋出異常,記錄異常日志 (log.error("消費異常", ex)) 并再次拋出異常。
  • 消息處理完成標記
    • 在成功完成業(yè)務(wù)邏輯后,調(diào)用 messageQueueIdempotentHandler.setAccomplish(keys) 將消息標記為已完成。
    • 這樣可以確保消息的狀態(tài)被正確更新,即使在重試后也能避免重復(fù)消費,保證消息的冪等性。

四.消息消費流程

假設(shè)消息消費應(yīng)用場景如下:

  • 接收消息:消息隊列接收到新消息,獲取消息的唯一標識 messageId。
  • 檢查是否已處理
    • 調(diào)用 isMessageNotProcessed 檢查 Redis 中是否有該消息的標識。
    • 若返回 true,則說明此消息未被處理,進入消費流程。
    • 若返回 false,則說明此消息已處理,直接跳過避免重復(fù)消費。
  • 消費消息:若消息未處理,則執(zhí)行具體的業(yè)務(wù)邏輯。
  • 標記完成狀態(tài)
    • 消費完成后,調(diào)用 markAsProcessed,在 Redis 中將該消息標記為已完成。
  • 異常處理
    • 若消費過程中拋出異常,調(diào)用 clearProcessedFlag 刪除該消息的 Redis 標識,允許系統(tǒng)稍后重新嘗試處理該消息。

五.總結(jié)

在本方案中,通過使用 Redis 實現(xiàn) MQ 消息的冪等處理,確保了消息在消費過程中只會被處理一次,避免了重復(fù)消費帶來的業(yè)務(wù)異常和資源浪費。其主要特點和優(yōu)勢如下:

  • 冪等性保證:使用 Redis 的 setIfAbsent 來判斷消息是否已被處理,確保消息在消費過程中僅被執(zhí)行一次,避免重復(fù)消費的風(fēng)險。
  • 通用性設(shè)計
    • 生產(chǎn)者代碼采用通用模板,通過 UUID 生成消息的唯一標識 messageId,并將其嵌入到消息體中,保證每條消息的唯一性。
    • 消費者代碼采用冪等處理器模板 MessageIdempotentHandler,支持多種狀態(tài)檢查和異常處理。
  • 細化的異常處理:在消費過程中,若發(fā)生異常,可以及時刪除 Redis 中的標識,確保系統(tǒng)在下次重新消費該消息時不會被誤認為已處理,增強了消息消費的健壯性。
  • 靈活的業(yè)務(wù)集成:該方案可以根據(jù)不同業(yè)務(wù)需求調(diào)整消息內(nèi)容和自定義的業(yè)務(wù)邏輯處理,適應(yīng)多種場景下的消息冪等消費需求。
  • 流程化管理:通過 markAsProcessed 和 clearProcessedFlag 實現(xiàn)了消費完成狀態(tài)標記和異常重試機制,確保消費的可靠性和一致性。

總的來說,此方案為消息冪等性控制提供了一種可擴展、通用且高效的實現(xiàn)方式,非常適合在高并發(fā)分布式系統(tǒng)中應(yīng)用,能夠有效提高消息消費的穩(wěn)定性和安全性

到此這篇關(guān)于Redis處理MQ消費冪等的實現(xiàn)示例的文章就介紹到這了,更多相關(guān)Redis MQ消費冪等內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Redis中哈希分布不均勻的解決辦法

    Redis中哈希分布不均勻的解決辦法

    這篇文章主要介紹了Redis中哈希分布不均勻的解決辦法的相關(guān)資料,需要的朋友可以參考下
    2021-02-02
  • 詳解Redis主從復(fù)制實踐

    詳解Redis主從復(fù)制實踐

    本文將演示主從復(fù)制如何配置、實現(xiàn)以及實現(xiàn)原理,Redis主從復(fù)制三大策略,全量復(fù)制、部分復(fù)制和立即復(fù)制。
    2021-05-05
  • Redis高效檢索地理位置的原理解析

    Redis高效檢索地理位置的原理解析

    這篇文章主要介紹了Redis是如何高效檢索地理位置,通過geo相關(guān)的命令,可以很容易在redis中存儲和使用經(jīng)緯度坐標信息,具體實現(xiàn)方法跟隨小編一起看看吧
    2021-06-06
  • Redis在windows環(huán)境下如何啟動

    Redis在windows環(huán)境下如何啟動

    這篇文章主要介紹了Redis在windows環(huán)境下如何啟動的實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2025-04-04
  • jwt+redis實現(xiàn)登錄認證的示例代碼

    jwt+redis實現(xiàn)登錄認證的示例代碼

    在登錄業(yè)務(wù)代碼中,當用戶登錄成功時,生成一個登錄憑證存儲到redis中,本文主要介紹了jwt+redis實現(xiàn)登錄認證的示例代碼,具有一定的參考價值,感興趣的可以了解一下
    2024-03-03
  • Redis?哨兵模式的實現(xiàn)詳解

    Redis?哨兵模式的實現(xiàn)詳解

    本文主要介紹了Redis?哨兵模式的實現(xiàn)詳解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-06-06
  • Redis內(nèi)部數(shù)據(jù)結(jié)構(gòu)Dict的實現(xiàn)方法

    Redis內(nèi)部數(shù)據(jù)結(jié)構(gòu)Dict的實現(xiàn)方法

    這篇文章主要介紹了Redis內(nèi)部數(shù)據(jù)結(jié)構(gòu)Dict的實現(xiàn)方法,本篇文章所述的dict在Redis中最主要的作用就是用于維護Redis數(shù)據(jù)庫中所有Key、value映射的數(shù)據(jù)結(jié)構(gòu),需要的朋友可以參考下
    2022-05-05
  • Redis?存儲對象信息用?Hash?和String的區(qū)別

    Redis?存儲對象信息用?Hash?和String的區(qū)別

    這篇文章主要介紹了Redis存儲對象信息用Hash和String的區(qū)別,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下
    2022-09-09
  • 使用Redis防止重復(fù)發(fā)送RabbitMQ消息的方法詳解

    使用Redis防止重復(fù)發(fā)送RabbitMQ消息的方法詳解

    今天遇到一個問題,發(fā)送MQ消息的時候需要保證不會重復(fù)發(fā)送,注意不是可靠到達,這里保證的是不會生產(chǎn)多條一樣的消息,所以本文主要介紹了使用Redis防止重復(fù)發(fā)送RabbitMQ消息的方法,需要的朋友可以參考下
    2025-01-01
  • Redis內(nèi)存碎片產(chǎn)生原因及Pipeline管道原理解析

    Redis內(nèi)存碎片產(chǎn)生原因及Pipeline管道原理解析

    這篇文章主要為大家介紹了Redis內(nèi)存碎片產(chǎn)生原因及Pipeline管道原理解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-03-03

最新評論