欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot基于注解實現(xiàn)去重表消息防止重復(fù)消費

 更新時間:2025年05月16日 10:20:22   作者:sjsjsbbsbsn  
本文主要介紹了springboot基于注解實現(xiàn)去重表消息防止重復(fù)消費,通過記錄消息ID、使用分布式鎖和設(shè)置過期時間,可以確保消息只會被處理一次,具有一定的參考價值,感興趣的可以了解一下

1. 背景/問題

在分布式系統(tǒng)中,消息隊列(如RocketMQ、Kafka)的 消息重復(fù)消費 是常見問題,主要原因包括:

  • 網(wǎng)絡(luò)抖動:生產(chǎn)者或消費者因網(wǎng)絡(luò)不穩(wěn)定觸發(fā)消息重發(fā)。
  • 消費者超時:消費者處理時間過長,消息隊列誤判為失敗并重新投遞。
  • 集群故障轉(zhuǎn)移:消費者宕機后,未完成的消息會被其他節(jié)點重新拉取。

重復(fù)消費帶來的問題

  • 業(yè)務(wù)邏輯多次執(zhí)行(如重復(fù)扣款、重復(fù)生成訂單)。
  • 數(shù)據(jù)一致性被破壞(如庫存超賣、積分累加錯誤)。
  • 系統(tǒng)資源浪費,影響性能和穩(wěn)定性。

為了避免這種情況發(fā)生,需要在客戶端實現(xiàn)一些機制來確保消息不會被重復(fù)消費,例如記錄消費者已經(jīng)處理的消息 ID、使用分布式鎖來控制消費進程的唯一性等。這些機制能夠保證消息被成功處理,同時也能夠提高系統(tǒng)的可靠性和穩(wěn)定性。

2. 什么是冪等性

冪等性 是指對同一操作的多次執(zhí)行所產(chǎn)生的影響與一次執(zhí)行的影響相同。

  • 消息消費場景:無論消息被消費多少次,最終結(jié)果應(yīng)與消費一次一致。
  • 實現(xiàn)目標:通過冪等設(shè)計,確保業(yè)務(wù)邏輯的重復(fù)執(zhí)行不會產(chǎn)生副作用。

3. 冪等設(shè)計

核心思路

  • 冪等標識:為每條消息生成唯一標識(如業(yè)務(wù)ID + 消息ID),記錄其處理狀態(tài)。
  • 狀態(tài)管理:通過數(shù)據(jù)庫或Redis維護冪等標識的狀態(tài)(如“消費中”“已消費”)。
  • 過期時間:防止因系統(tǒng)崩潰導(dǎo)致狀態(tài)長期滯留,需設(shè)置合理的超時時間(如10分鐘)。
[消費者接收消息]  
        │  
        ▼  
[解析消息,生成唯一冪等標識]  
        │  
        ▼  
[查詢冪等標識狀態(tài)]  
        │  
┌───────┴───────┐  
│ 存在且已消費  │           [返回成功,丟棄消息]  
└───────┬───────┘  
        │  
┌───────┴───────┐  
│ 存在且消費中  │           [延遲消費,等待重試]  
└───────┬───────┘  
        │  
┌───────┴───────┐  
│   不存在      │  
└───────┬───────┘  
        │  
[設(shè)置冪等標識為“消費中”,并設(shè)置過期時間]  
        │  
        ▼  
[執(zhí)行業(yè)務(wù)邏輯]  
        │  
        ▼  
[業(yè)務(wù)執(zhí)行成功?]  
        │  
┌───────┴───────┐  
│     是        │           [更新標識為“已消費”]  
│               │           [刪除或保留標識]  
└───────┬───────┘  
        │  
┌───────┴───────┐  
│     否        │           [刪除標識,允許重試]  
└───────┬───────┘  
        │  
        ▼  
[流程結(jié)束]  

4.抽象通用冪等組件

消息防重復(fù)消費冪等組件是通用的通常會提取出來也可供其他模塊/服務(wù) 使用

4.1自定義冪等注解

提供了一種通用的冪等注解,并通過 SpEL 的形式生成去重表全局唯一 Key

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoMQDuplicateConsume {

    /**
     * 設(shè)置防重令牌 Key 前綴
     */
    String keyPrefix() default "";

    /**
     * 通過 SpEL 表達式生成的唯一 Key
     */
    String key();

    /**
     * 設(shè)置防重令牌 Key 過期時間,單位秒,默認 1 小時
     */
    long keyTimeout() default 3600L;
}

4.2. 定義冪等枚舉

冪等需要設(shè)置兩個狀態(tài),消費中和已消費,創(chuàng)建對應(yīng)的枚舉

@RequiredArgsConstructor
public enum IdempotentMQConsumeStatusEnum {

    /**
     * 消費中
     */
    CONSUMING("0"),

    /**
     * 已消費
     */
    CONSUMED("1");

    @Getter
    private final String code;

    /**
     * 如果消費狀態(tài)等于消費中,返回失敗
     *
     * @param consumeStatus 消費狀態(tài)
     * @return 是否消費失敗
     */
    public static boolean isError(String consumeStatus) {
        return Objects.equals(CONSUMING.code, consumeStatus);
    }
}

4.3.通過 AOP 的方式進行增強注解

如果說方法上加了注解,會被這段 AOP 代碼以環(huán)繞增強方式執(zhí)行

@Slf4j
@Aspect
@RequiredArgsConstructor
public final class NoMQDuplicateConsumeAspect {

    private final StringRedisTemplate stringRedisTemplate;

    private static final String LUA_SCRIPT = """
            local key = KEYS[1]
            local value = ARGV[1]
            local expire_time_ms = ARGV[2]
            return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)
            """;

    /**
     * 增強方法標記 {@link NoMQDuplicateConsume} 注解邏輯
     */
    @Around("@annotation(com.nageoffer.onecoupon.framework.idempotent.NoMQDuplicateConsume)")
    public Object noMQRepeatConsume(ProceedingJoinPoint joinPoint) throws Throwable {
        NoMQDuplicateConsume noMQDuplicateConsume = getNoMQDuplicateConsumeAnnotation(joinPoint);
        String uniqueKey = noMQDuplicateConsume.keyPrefix() + SpELUtil.parseKey(noMQDuplicateConsume.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs());

        String absentAndGet = stringRedisTemplate.execute(
                RedisScript.of(LUA_SCRIPT, String.class),
                List.of(uniqueKey),
                IdempotentMQConsumeStatusEnum.CONSUMING.getCode(),
                String.valueOf(TimeUnit.SECONDS.toMillis(noMQDuplicateConsume.keyTimeout()))
        );

        // 如果不為空證明已經(jīng)有
        if (Objects.nonNull(absentAndGet)) {
            boolean errorFlag = IdempotentMQConsumeStatusEnum.isError(absentAndGet);
            log.warn("[{}] MQ repeated consumption, {}.", uniqueKey, errorFlag ? "Wait for the client to delay consumption" : "Status is completed");
            if (errorFlag) {
                throw new ServiceException(String.format("消息消費者冪等異常,冪等標識:%s", uniqueKey));
            }
            return null;
        }

        Object result;
        try {
            // 執(zhí)行標記了消息隊列防重復(fù)消費注解的方法原邏輯
            result = joinPoint.proceed();

            // 設(shè)置防重令牌 Key 過期時間,單位秒
            stringRedisTemplate.opsForValue().set(uniqueKey, IdempotentMQConsumeStatusEnum.CONSUMED.getCode(), noMQDuplicateConsume.keyTimeout(), TimeUnit.SECONDS);
        } catch (Throwable ex) {
            // 刪除冪等 Key,讓消息隊列消費者重試邏輯進行重新消費
            stringRedisTemplate.delete(uniqueKey);
            throw ex;
        }
        return result;
    }

    /**
     * @return 返回自定義防重復(fù)消費注解
     */
    public static NoMQDuplicateConsume getNoMQDuplicateConsumeAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());
        return targetMethod.getAnnotation(NoMQDuplicateConsume.class);
    }

lua腳本解釋

local key = KEYS[1] # 第一個 Key,即冪等唯一標識 uniqueKey
local value = ARGV[1] # 第一個參數(shù),即初始化冪等消費狀態(tài),為消費中
local expire_time_ms = ARGV[2] # 第二個參數(shù),即冪等 Key 過期時間

return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)

該腳本的主要作用是:在 Redis 中嘗試以 NX 方式設(shè)置一個鍵,即如果鍵不存在,則設(shè)置新值,并返回設(shè)置之前的舊值,同時為該鍵設(shè)置過期時間(以毫秒為單位)。

獲取到 Redis 里面的 Key 值后,可能會有三個流程執(zhí)行:

  • absentAndGet 為空:代表消息是第一次到達,執(zhí)行完 LUA 腳本后,會在 Redis 設(shè)置 Key 的 Value 值為 0,消費中狀態(tài)。
  • absentAndGet 為 0:代表已經(jīng)有相同消息到達并且還沒有處理完,會通過拋異常的形式讓 RocketMQ 重試。
  • absentAndGet 為 1:代表已經(jīng)有相同消息消費完成,返回空表示不執(zhí)行任何處理。

4.4.注冊為 Spring Bean

另外可以看看另一篇基于分布式鎖注解防重復(fù)提交

https://blog.csdn.net/sjsjsbbsbsn/article/details/145131305?spm=1001.2014.3001.5501

public class IdempotentConfiguration {
    /**
     * 防止消息隊列消費者重復(fù)消費消息切面控制器
     */
    @Bean
    public NoMQDuplicateConsumeAspect noMQDuplicateConsumeAspect(StringRedisTemplate stringRedisTemplate) {
        return new NoMQDuplicateConsumeAspect(stringRedisTemplate);
    }
}

4.5EL工具類

public class SpELUtil {
    /**
     * 校驗并返回實際使用的 spEL 表達式
     *
     * @param spEl spEL 表達式
     * @return 實際使用的 spEL 表達式
     */
    public static Object parseKey(String spEl, Method method, Object[] contextObj) {
        List<String> spELFlag = ListUtil.of("#", "T(");
        Optional<String> optional = spELFlag.stream().filter(spEl::contains).findFirst();
        if (optional.isPresent()) {
            return parse(spEl, method, contextObj);
        }
        return spEl;
    }

    /**
     * 轉(zhuǎn)換參數(shù)為字符串
     *
     * @param spEl       spEl 表達式
     * @param contextObj 上下文對象
     * @return 解析的字符串值
     */
    public static Object parse(String spEl, Method method, Object[] contextObj) {
        DefaultParameterNameDiscoverer discoverer = new DefaultParameterNameDiscoverer();
        ExpressionParser parser = new SpelExpressionParser();
        Expression exp = parser.parseExpression(spEl);
        String[] params = discoverer.getParameterNames(method);
        StandardEvaluationContext context = new StandardEvaluationContext();
        if (ArrayUtil.isNotEmpty(params)) {
            for (int len = 0; len < params.length; len++) {
                context.setVariable(params[len], contextObj[len]);
            }
        }
        return exp.getValue(context);
    }
}

5.實戰(zhàn)使用

使用天機學(xué)堂項目來進行實戰(zhàn)

5.1寫入common模塊

在這里插入圖片描述

5.2使用

在這里插入圖片描述

直接加上注解就可以

但是實際上這里不存在冪等問題,因為userId和courseId設(shè)置了唯一索引,所以這里不存在冪等性,不需要加上冪等注解

到此這篇關(guān)于springboot基于注解實現(xiàn)去重表消息防止重復(fù)消費的文章就介紹到這了,更多相關(guān)springboot注解防止重復(fù)消費內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java中實現(xiàn)OCR識別讀取圖片中的文字

    Java中實現(xiàn)OCR識別讀取圖片中的文字

    圖片內(nèi)容一般無法編輯,如果想要讀取圖片中的文本,我們需要用到OCR工具,本文將介紹如何在Java中實現(xiàn)OCR識別讀取圖片中的文字,文中通過代碼示例介紹的非常詳細,需要的朋友可以參考下
    2024-04-04
  • java調(diào)用openoffice將office系列文檔轉(zhuǎn)換為PDF的示例方法

    java調(diào)用openoffice將office系列文檔轉(zhuǎn)換為PDF的示例方法

    本篇文章主要介紹了java使用openoffice將office系列文檔轉(zhuǎn)換為PDF的示例方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下。
    2017-11-11
  • Java如何使用HTTPclient訪問url獲得數(shù)據(jù)

    Java如何使用HTTPclient訪問url獲得數(shù)據(jù)

    這篇文章主要介紹了Java使用HTTPclient訪問url獲得數(shù)據(jù)的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • 基于Java的Scoket編程

    基于Java的Scoket編程

    本文詳細講解了基于Java的Scoket編程,文中通過示例代碼介紹的非常詳細。對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-12-12
  • Java實現(xiàn)圖片比率縮放

    Java實現(xiàn)圖片比率縮放

    這篇文章主要為大家詳細介紹了Java通過Thumbnails實現(xiàn)圖片比率縮放,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-04-04
  • Spring詳解四種加載配置項的方法

    Spring詳解四種加載配置項的方法

    這篇文章主要給大家介紹了關(guān)于springboot加載配置項的四種方式,文中通過示例代碼介紹的非常詳細,對大家學(xué)習(xí)或者使用springboot具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2022-06-06
  • Nacos配置中心的配置文件的匹配規(guī)則及說明

    Nacos配置中心的配置文件的匹配規(guī)則及說明

    這篇文章主要介紹了Nacos配置中心的配置文件的匹配規(guī)則及說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-11-11
  • Java實現(xiàn)去除文檔陰影的示例代碼

    Java實現(xiàn)去除文檔陰影的示例代碼

    文稿掃描大家用的都比較頻繁、想是各種證件、文件都可以通過掃描文稿功能保存到手機。相比直接拍照,在掃描文稿時,程序會對圖像進行一些矯正。比如去除陰影、修正傾斜、旋轉(zhuǎn)矯正等。進行這些處理后的圖片要更加容易識別。今天就來討論一下去除陰影的操作
    2022-12-12
  • java8中:: 用法示例(JDK8雙冒號用法)

    java8中:: 用法示例(JDK8雙冒號用法)

    這篇文章主要給大家介紹了關(guān)于java8 中的:: 用法(JDK8雙冒號用法)的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家學(xué)習(xí)或者使用java8具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-09-09
  • Java都有哪些創(chuàng)建線程的方法

    Java都有哪些創(chuàng)建線程的方法

    這篇文章主要介紹了Java都有哪些創(chuàng)建線程的方法,文章分享Java創(chuàng)建線程得幾種方法及推薦使用哪種方法,下面詳細內(nèi)容需要的小伙伴可以參考一下
    2022-05-05

最新評論